[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868726#comment-15868726
 ] 

ASF GitHub Bot commented on FLINK-3849:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3166#discussion_r101397995
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
 ---
    @@ -98,4 +106,134 @@ object CommonTestData {
           this(null, null)
         }
       }
    +
    +  def getMockTableEnvironment: TableEnvironment = new MockTableEnvironment
    +
    +  def getFilterableTableSource(
    +    fieldNames: Array[String] = Array[String](
    +      "name", "id", "amount", "price"),
    +    fieldTypes: Array[TypeInformation[_]] = Array(
    +      BasicTypeInfo.STRING_TYPE_INFO,
    +      BasicTypeInfo.LONG_TYPE_INFO,
    +      BasicTypeInfo.INT_TYPE_INFO,
    +      BasicTypeInfo.DOUBLE_TYPE_INFO)) = new 
TestFilterableTableSource(fieldNames, fieldTypes)
    +}
    +
    +class MockTableEnvironment extends TableEnvironment(new TableConfig) {
    +
    +  override private[flink] def writeToSink[T](table: Table, sink: 
TableSink[T]): Unit = ???
    +
    +  override protected def checkValidTableName(name: String): Unit = ???
    +
    +  override protected def getBuiltInRuleSet: RuleSet = ???
    +
    +  override def sql(query: String): Table = ???
    +
    +  override def registerTableSource(name: String, tableSource: 
TableSource[_]): Unit = ???
    +}
    +
    +class TestFilterableTableSource(
    +    fieldNames: Array[String],
    +    fieldTypes: Array[TypeInformation[_]])
    +  extends BatchTableSource[Row]
    +    with StreamTableSource[Row]
    +    with FilterableTableSource
    +    with DefinedFieldNames {
    +
    +  private var filterPredicate: Option[Expression] = None
    +
    +  /** Returns the data of the table as a [[DataSet]]. */
    +  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
    +    execEnv.fromCollection[Row](
    +      generateDynamicCollection(33, fieldNames, filterPredicate).asJava, 
getReturnType)
    +  }
    +
    +  /** Returns the data of the table as a [[DataStream]]. */
    +  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] 
= {
    +    execEnv.fromCollection[Row](
    +      generateDynamicCollection(33, fieldNames, filterPredicate).asJava, 
getReturnType)
    +  }
    +
    +  private def generateDynamicCollection(
    +    num: Int,
    +    fieldNames: Array[String],
    +    predicate: Option[Expression]): Seq[Row] = {
    +
    +    if (predicate.isEmpty) {
    +      throw new RuntimeException("filter expression was not set")
    +    }
    +
    +    val literal = predicate.get.children.last
    +      .asInstanceOf[Literal]
    +      .value.asInstanceOf[Int]
    +
    +    def shouldCreateRow(value: Int): Boolean = {
    +      value > literal
    +    }
    +
    +    def createRow(row: Row, name: String, pos: Int, value: Int): Unit = {
    --- End diff --
    
    With hard-coded schema, this methods would not be necessary


> Add FilterableTableSource interface and translation rule
> --------------------------------------------------------
>
>                 Key: FLINK-3849
>                 URL: https://issues.apache.org/jira/browse/FLINK-3849
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Anton Solovev
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to