Repository: flink Updated Branches: refs/heads/master 70e71c161 -> 7eb45c133
[FLINK-4590] [table] Some Table API tests are failing when debug lvl is set to DEBUG This closes #2504. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7eb45c13 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7eb45c13 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7eb45c13 Branch: refs/heads/master Commit: 7eb45c133c49933b14719f06bf68ccf162a3e0b2 Parents: 70e71c1 Author: twalthr <twal...@apache.org> Authored: Fri Sep 16 10:52:28 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Mon Sep 26 18:48:47 2016 +0200 ---------------------------------------------------------------------- .../table/plan/nodes/dataset/BatchScan.scala | 10 ++--- .../nodes/dataset/BatchTableSourceScan.scala | 6 +-- .../plan/nodes/dataset/DataSetAggregate.scala | 28 +++++++------- .../table/plan/nodes/dataset/DataSetCalc.scala | 16 ++++---- .../plan/nodes/dataset/DataSetIntersect.scala | 16 ++++---- .../table/plan/nodes/dataset/DataSetJoin.scala | 20 +++++----- .../table/plan/nodes/dataset/DataSetMinus.scala | 39 +++++++++++++------- .../table/plan/nodes/dataset/DataSetScan.scala | 6 +-- .../table/plan/nodes/dataset/DataSetSort.scala | 22 ++++++----- .../table/plan/nodes/dataset/DataSetUnion.scala | 19 ++++++---- .../plan/nodes/dataset/DataSetValues.scala | 16 ++++---- .../plan/nodes/datastream/DataStreamCalc.scala | 16 ++++---- .../plan/nodes/datastream/DataStreamScan.scala | 6 +-- .../plan/nodes/datastream/DataStreamUnion.scala | 16 ++++---- .../nodes/datastream/DataStreamValues.scala | 12 +++--- .../plan/nodes/datastream/StreamScan.scala | 8 ++-- .../src/test/resources/log4j-test.properties | 27 ++++++++++++++ .../src/test/resources/logback-test.xml | 29 +++++++++++++++ 18 files changed, 193 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala index 85ed6ef..15b2081 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala @@ -36,14 +36,14 @@ abstract class BatchScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - rowType: RelDataType) + rowRelDataType: RelDataType) extends TableScan(cluster, traitSet, table) with DataSetRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def toString: String = { - s"Source(from: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))" + s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { @@ -81,14 +81,14 @@ abstract class BatchScan( val mapFunc = getConversionMapper( config, - false, + nullableInput = false, inputType, determinedType, "DataSetSourceConversion", getRowType.getFieldNames, Some(flinkTable.fieldIndexes)) - val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" + val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" input.map(mapFunc).name(opName) } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala index 027a5be..10d9534 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -35,15 +35,15 @@ class BatchTableSourceScan( rowType: RelDataType) extends BatchScan(cluster, traitSet, table, rowType) { - val tableSourceTable = table.unwrap(classOf[TableSourceTable]) + val tableSourceTable = getTable.unwrap(classOf[TableSourceTable]) val tableSource = tableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]] override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new BatchTableSourceScan( cluster, traitSet, - table, - rowType + getTable, + getRowType ) } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index 8aa18ca..c826d83 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -40,13 +40,13 @@ class DataSetAggregate( traitSet: RelTraitSet, input: RelNode, namedAggregates: Seq[CalcitePair[AggregateCall, String]], - rowType: RelDataType, + rowRelDataType: RelDataType, inputType: RelDataType, grouping: Array[Int]) extends SingleRel(cluster, traitSet, input) with DataSetRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetAggregate( @@ -54,7 +54,7 @@ class DataSetAggregate( traitSet, inputs.get(0), namedAggregates, - rowType, + getRowType, inputType, grouping) } @@ -91,15 +91,15 @@ class DataSetAggregate( val groupingKeys = grouping.indices.toArray // add grouping fields, position keys in the input, and input type val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, - inputType, rowType, grouping, config) + inputType, getRowType, grouping, config) - val inputDS = input.asInstanceOf[DataSetRel].translateToPlan( + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan( tableEnv, // tell the input operator that this operator currently only supports Rows as input Some(TypeConverter.DEFAULT_ROW_TYPE)) // get the output types - val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala + val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) .toArray @@ -138,14 +138,14 @@ class DataSetAggregate( // if the expected type is not a Row, inject a mapper to convert to the expected type expectedType match { case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => - val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" + val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" result.map(getConversionMapper( - config, - false, - rowTypeInfo.asInstanceOf[TypeInformation[Any]], - expectedType.get, - "DataSetAggregateConversion", - rowType.getFieldNames.asScala + config = config, + nullableInput = false, + inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]], + expectedType = expectedType.get, + conversionOperatorName = "DataSetAggregateConversion", + fieldNames = getRowType.getFieldNames.asScala )) .name(mapName) case _ => result @@ -161,7 +161,7 @@ class DataSetAggregate( private def aggregationToString: String = { val inFields = inputType.getFieldNames.asScala - val outFields = rowType.getFieldNames.asScala + val outFields = getRowType.getFieldNames.asScala val groupStrings = grouping.map( inFields(_) ) http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala index 75e4fd2..6d10089 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala @@ -40,32 +40,32 @@ class DataSetCalc( cluster: RelOptCluster, traitSet: RelTraitSet, input: RelNode, - rowType: RelDataType, + rowRelDataType: RelDataType, calcProgram: RexProgram, ruleDescription: String) extends SingleRel(cluster, traitSet, input) with FlinkCalc with DataSetRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetCalc( cluster, traitSet, inputs.get(0), - rowType, + getRowType, calcProgram, ruleDescription) } - override def toString: String = calcToString(calcProgram, getExpressionString(_, _, _)) + override def toString: String = calcToString(calcProgram, getExpressionString) override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .item("select", selectionToString(calcProgram, getExpressionString(_, _, _))) + .item("select", selectionToString(calcProgram, getExpressionString)) .itemIf("where", - conditionToString(calcProgram, getExpressionString(_, _, _)), + conditionToString(calcProgram, getExpressionString), calcProgram.getCondition != null) } @@ -95,7 +95,7 @@ class DataSetCalc( val config = tableEnv.getConfig - val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val returnType = determineReturnType( getRowType, @@ -120,7 +120,7 @@ class DataSetCalc( returnType) val mapFunc = calcMapFunction(genFunction) - inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString(_, _, _))) + inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala index 042c28b..d2203d0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala @@ -38,14 +38,14 @@ import scala.collection.JavaConverters._ class DataSetIntersect( cluster: RelOptCluster, traitSet: RelTraitSet, - left: RelNode, - right: RelNode, - rowType: RelDataType, + leftNode: RelNode, + rightNode: RelNode, + rowRelDataType: RelDataType, all: Boolean) - extends BiRel(cluster, traitSet, left, right) + extends BiRel(cluster, traitSet, leftNode, rightNode) with DataSetRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetIntersect( @@ -53,7 +53,7 @@ class DataSetIntersect( traitSet, inputs.get(0), inputs.get(1), - rowType, + getRowType, all ) } @@ -115,7 +115,7 @@ class DataSetIntersect( "DataSetIntersectConversion", getRowType.getFieldNames) - val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" + val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" intersectDs.map(mapFunc).name(opName) } @@ -127,7 +127,7 @@ class DataSetIntersect( } private def intersectSelectionToString: String = { - rowType.getFieldNames.asScala.toList.mkString(", ") + getRowType.getFieldNames.asScala.toList.mkString(", ") } } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index 50a9b2d..bbb6325 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -44,9 +44,9 @@ import scala.collection.mutable.ArrayBuffer class DataSetJoin( cluster: RelOptCluster, traitSet: RelTraitSet, - left: RelNode, - right: RelNode, - rowType: RelDataType, + leftNode: RelNode, + rightNode: RelNode, + rowRelDataType: RelDataType, joinCondition: RexNode, joinRowType: RelDataType, joinInfo: JoinInfo, @@ -54,10 +54,10 @@ class DataSetJoin( joinType: JoinRelType, joinHint: JoinHint, ruleDescription: String) - extends BiRel(cluster, traitSet, left, right) + extends BiRel(cluster, traitSet, leftNode, rightNode) with DataSetRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetJoin( @@ -65,7 +65,7 @@ class DataSetJoin( traitSet, inputs.get(0), inputs.get(1), - rowType, + getRowType, joinCondition, joinRowType, joinInfo, @@ -113,7 +113,7 @@ class DataSetJoin( val rightKeys = ArrayBuffer.empty[Int] if (keyPairs.isEmpty) { // if no equality keys => not supported - throw new TableException( + throw TableException( "Joins should have at least one equality condition.\n" + s"\tLeft: ${left.toString},\n" + s"\tRight: ${right.toString},\n" + @@ -135,7 +135,7 @@ class DataSetJoin( leftKeys.add(pair.source) rightKeys.add(pair.target) } else { - throw new TableException( + throw TableException( "Equality join predicate on incompatible types.\n" + s"\tLeft: ${left.toString},\n" + s"\tRight: ${right.toString},\n" + @@ -156,7 +156,7 @@ class DataSetJoin( } if (nullCheck && !config.getNullCheck) { - throw new TableException("Null check in TableConfig must be enabled for outer joins.") + throw TableException("Null check in TableConfig must be enabled for outer joins.") } val generator = new CodeGenerator( @@ -205,7 +205,7 @@ class DataSetJoin( } private def joinSelectionToString: String = { - rowType.getFieldNames.asScala.toList.mkString(", ") + getRowType.getFieldNames.asScala.toList.mkString(", ") } private def joinConditionToString: String = { http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala index d3a2fe7..6a5cbd1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala @@ -38,14 +38,14 @@ import scala.collection.JavaConverters._ class DataSetMinus( cluster: RelOptCluster, traitSet: RelTraitSet, - left: RelNode, - right: RelNode, - rowType: RelDataType, + leftNode: RelNode, + rightNode: RelNode, + rowRelDataType: RelDataType, all: Boolean) - extends BiRel(cluster, traitSet, left, right) + extends BiRel(cluster, traitSet, leftNode, rightNode) with DataSetRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetMinus( @@ -53,7 +53,7 @@ class DataSetMinus( traitSet, inputs.get(0), inputs.get(1), - rowType, + getRowType, all ) } @@ -75,6 +75,17 @@ class DataSetMinus( } } + override def estimateRowCount(mq: RelMetadataQuery): Double = { + // from org.apache.calcite.rel.metadata.RelMdUtil.getMinusRowCount + val children = this.getInputs + var rowCnt = mq.getRowCount(children.head) + getInputs.tail.foreach(rowCnt -= 0.5 * mq.getRowCount(_)) + if (rowCnt < 0) { + rowCnt = 0.0 + } + rowCnt + } + override def translateToPlan( tableEnv: BatchTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { @@ -108,14 +119,14 @@ class DataSetMinus( // conversion if (determinedType != leftType) { val mapFunc = getConversionMapper( - config, - false, - leftType, - determinedType, - "DataSetMinusConversion", - getRowType.getFieldNames) + config = config, + nullableInput = false, + inputType = leftType, + expectedType = determinedType, + conversionOperatorName = "DataSetMinusConversion", + fieldNames = getRowType.getFieldNames) - val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" + val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" minusDs.map(mapFunc).name(opName) } @@ -127,7 +138,7 @@ class DataSetMinus( } private def minusSelectionToString: String = { - rowType.getFieldNames.asScala.toList.mkString(", ") + getRowType.getFieldNames.asScala.toList.mkString(", ") } } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala index 17a7db2..3c34bc3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala @@ -38,14 +38,14 @@ class DataSetScan( rowType: RelDataType) extends BatchScan(cluster, traitSet, table, rowType) { - val dataSetTable: DataSetTable[Any] = table.unwrap(classOf[DataSetTable[Any]]) + val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]]) override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetScan( cluster, traitSet, - table, - rowType + getTable, + getRowType ) } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala index 22930e7..661aeef 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -39,7 +39,7 @@ class DataSetSort( traitSet: RelTraitSet, inp: RelNode, collations: RelCollation, - rowType2: RelDataType, + rowRelDataType: RelDataType, offset: RexNode, fetch: RexNode) extends SingleRel(cluster, traitSet, inp) @@ -57,13 +57,15 @@ class DataSetSort( Long.MaxValue } + override def deriveRowType(): RelDataType = rowRelDataType + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = { new DataSetSort( cluster, traitSet, inputs.get(0), collations, - rowType2, + getRowType, offset, fetch ) @@ -138,15 +140,15 @@ class DataSetSort( if (determinedType != inputType) { val mapFunc = getConversionMapper( - config, - false, - partitionedDs.getType, - determinedType, - "DataSetSortConversion", - getRowType.getFieldNames.asScala + config = config, + nullableInput = false, + inputType = partitionedDs.getType, + expectedType = determinedType, + conversionOperatorName = "DataSetSortConversion", + fieldNames = getRowType.getFieldNames.asScala ) - val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" + val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" limitedDs.map(mapFunc).name(opName) } @@ -170,7 +172,7 @@ class DataSetSort( .map(c => (c.getFieldIndex, directionToOrder(c.getDirection))) private val sortFieldsToString = fieldCollations - .map(col => s"${rowType2.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") + .map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ") private val offsetToString = s"$offset" http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala index ff1ff29..6e43fae 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala @@ -36,13 +36,13 @@ import scala.collection.JavaConverters._ class DataSetUnion( cluster: RelOptCluster, traitSet: RelTraitSet, - left: RelNode, - right: RelNode, - rowType: RelDataType) - extends BiRel(cluster, traitSet, left, right) + leftNode: RelNode, + rightNode: RelNode, + rowRelDataType: RelDataType) + extends BiRel(cluster, traitSet, leftNode, rightNode) with DataSetRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetUnion( @@ -50,7 +50,7 @@ class DataSetUnion( traitSet, inputs.get(0), inputs.get(1), - rowType + rowRelDataType ) } @@ -72,6 +72,11 @@ class DataSetUnion( planner.getCostFactory.makeCost(rowCnt, 0, 0) } + override def estimateRowCount(mq: RelMetadataQuery): Double = { + // adopted from org.apache.calcite.rel.metadata.RelMdUtil.getUnionAllRowCount + getInputs.foldLeft(0.0)(_ + mq.getRowCount(_)) + } + override def translateToPlan( tableEnv: BatchTableEnvironment, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { @@ -93,7 +98,7 @@ class DataSetUnion( } private def unionSelectionToString: String = { - rowType.getFieldNames.asScala.toList.mkString(", ") + rowRelDataType.getFieldNames.asScala.toList.mkString(", ") } } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala index a31f199..1b637c8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala @@ -41,24 +41,24 @@ import scala.collection.JavaConversions._ class DataSetValues( cluster: RelOptCluster, traitSet: RelTraitSet, - rowType: RelDataType, + rowRelDataType: RelDataType, tuples: ImmutableList[ImmutableList[RexLiteral]]) - extends Values(cluster, rowType, tuples, traitSet) + extends Values(cluster, rowRelDataType, tuples, traitSet) with DataSetRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataSetValues( cluster, traitSet, - rowType, - tuples + getRowType, + getTuples ) } override def toString: String = { - "Values(values: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))" + s"Values(values: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" } override def explainTerms(pw: RelWriter): RelWriter = { @@ -78,7 +78,7 @@ class DataSetValues( config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo] // convert List[RexLiteral] to Row - val rows: Seq[Row] = tuples.asList.map { t => + val rows: Seq[Row] = getTuples.asList.map { t => val row = new Row(t.size()) t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) ) row @@ -89,7 +89,7 @@ class DataSetValues( } private def valuesFieldsToString: String = { - rowType.getFieldNames.asScala.toList.mkString(", ") + getRowType.getFieldNames.asScala.toList.mkString(", ") } } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala index 334c0aa..5312a5f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCalc.scala @@ -38,33 +38,33 @@ class DataStreamCalc( cluster: RelOptCluster, traitSet: RelTraitSet, input: RelNode, - rowType: RelDataType, + rowRelDataType: RelDataType, calcProgram: RexProgram, ruleDescription: String) extends SingleRel(cluster, traitSet, input) with FlinkCalc with DataStreamRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamCalc( cluster, traitSet, inputs.get(0), - rowType, + getRowType, calcProgram, ruleDescription ) } - override def toString: String = calcToString(calcProgram, getExpressionString(_, _, _)) + override def toString: String = calcToString(calcProgram, getExpressionString) override def explainTerms(pw: RelWriter): RelWriter = { super.explainTerms(pw) - .item("select", selectionToString(calcProgram, getExpressionString(_, _, _))) + .item("select", selectionToString(calcProgram, getExpressionString)) .itemIf("where", - conditionToString(calcProgram, getExpressionString(_, _, _)), + conditionToString(calcProgram, getExpressionString), calcProgram.getCondition != null) } @@ -74,7 +74,7 @@ class DataStreamCalc( val config = tableEnv.getConfig - val inputDataStream = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) val returnType = determineReturnType( getRowType, @@ -99,6 +99,6 @@ class DataStreamCalc( returnType) val mapFunc = calcMapFunction(genFunction) - inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString(_, _, _))) + inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString)) } } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala index cfd04b0..463e1bc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamScan.scala @@ -38,14 +38,14 @@ class DataStreamScan( rowType: RelDataType) extends StreamScan(cluster, traitSet, table, rowType) { - val dataStreamTable: DataStreamTable[Any] = table.unwrap(classOf[DataStreamTable[Any]]) + val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]]) override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamScan( cluster, traitSet, - table, - rowType + getTable, + getRowType ) } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala index e72e9a8..f490d31 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamUnion.scala @@ -34,13 +34,13 @@ import scala.collection.JavaConverters._ class DataStreamUnion( cluster: RelOptCluster, traitSet: RelTraitSet, - left: RelNode, - right: RelNode, - rowType: RelDataType) - extends BiRel(cluster, traitSet, left, right) + leftNode: RelNode, + rightNode: RelNode, + rowRelDataType: RelDataType) + extends BiRel(cluster, traitSet, leftNode, rightNode) with DataStreamRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamUnion( @@ -48,7 +48,7 @@ class DataStreamUnion( traitSet, inputs.get(0), inputs.get(1), - rowType + getRowType ) } @@ -57,7 +57,7 @@ class DataStreamUnion( } override def toString = { - "Union(union: (${rowType.getFieldNames.asScala.toList.mkString(\", \")}))" + s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))" } override def translateToPlan( @@ -70,6 +70,6 @@ class DataStreamUnion( } private def unionSelectionToString: String = { - rowType.getFieldNames.asScala.toList.mkString(", ") + getRowType.getFieldNames.asScala.toList.mkString(", ") } } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala index 3ae19ac..44130e7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala @@ -39,19 +39,19 @@ import scala.collection.JavaConversions._ class DataStreamValues( cluster: RelOptCluster, traitSet: RelTraitSet, - rowType: RelDataType, + rowRelDataType: RelDataType, tuples: ImmutableList[ImmutableList[RexLiteral]]) - extends Values(cluster, rowType, tuples, traitSet) + extends Values(cluster, rowRelDataType, tuples, traitSet) with DataStreamRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { new DataStreamValues( cluster, traitSet, - rowType, - tuples + getRowType, + getTuples ) } @@ -68,7 +68,7 @@ class DataStreamValues( config.getEfficientTypeUsage).asInstanceOf[RowTypeInfo] // convert List[RexLiteral] to Row - val rows: Seq[Row] = tuples.asList.map { t => + val rows: Seq[Row] = getTuples.asList.map { t => val row = new Row(t.size()) t.zipWithIndex.foreach( x => row.setField(x._2, x._1.getValue.asInstanceOf[Any]) ) row http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala index 1f5fc6a..17620d0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/StreamScan.scala @@ -38,11 +38,11 @@ abstract class StreamScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - rowType: RelDataType) + rowRelDataType: RelDataType) extends TableScan(cluster, traitSet, table) with DataStreamRel { - override def deriveRowType() = rowType + override def deriveRowType() = rowRelDataType protected def convertToExpectedType( input: DataStream[Any], @@ -72,7 +72,7 @@ abstract class StreamScan( if (determinedType != inputType) { val generator = new CodeGenerator( config, - false, + nullableInput = false, input.getType, flinkTable.fieldIndexes) @@ -97,7 +97,7 @@ abstract class StreamScan( genFunction.code, genFunction.returnType) - val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})" + val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" input.map(mapFunc).name(opName) } http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/resources/log4j-test.properties b/flink-libraries/flink-table/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..4c74d85 --- /dev/null +++ b/flink-libraries/flink-table/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/7eb45c13/flink-libraries/flink-table/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/resources/logback-test.xml b/flink-libraries/flink-table/src/test/resources/logback-test.xml new file mode 100644 index 0000000..b99489e --- /dev/null +++ b/flink-libraries/flink-table/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration>