Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3715#discussion_r122838733
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 ---
    @@ -162,8 +162,25 @@ class RelTimeIndicatorConverter(rexBuilder: 
RexBuilder) extends RelShuttle {
         LogicalProject.create(input, projects, fieldNames)
       }
     
    -  override def visit(join: LogicalJoin): RelNode =
    -    throw new TableException("Logical join in a stream environment is not 
supported yet.")
    +  override def visit(join: LogicalJoin): RelNode = {
    +    val left = join.getLeft.accept(this)
    +    val right = join.getRight.accept(this)
    +
    +    // check if input field contains time indicator type
    +    // materialize field if no time indicator is present anymore
    +    // if input field is already materialized, change to timestamp type
    +    val inputFields = left.getRowType.getFieldList.map(_.getType) ++
    +      right.getRowType.getFieldList.map(_.getType)
    +    val materializer = new RexTimeIndicatorMaterializer(
    +      rexBuilder,
    +      inputFields)
    +
    +    val condition = join.getCondition.accept(materializer)
    --- End diff --
    
    I think we do not need to materialize time indicators for join predicates. 
If the time indicators are used in valid time-based join predicates we do not 
code-gen the predicate and if they the time-based join predicate is not valid, 
the query will fail anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to