godfreyhe commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r681731759



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -128,32 +155,165 @@ public static RelNode convert(
     }
 
     @Override
-    public RelNode visit(LogicalIntersect intersect) {
-        return visitSetOp(intersect);
+    public RelNode visit(RelNode node) {
+        if (node instanceof FlinkLogicalValues) {
+            return node;
+        } else if (node instanceof FlinkLogicalIntersect
+                || node instanceof FlinkLogicalUnion
+                || node instanceof FlinkLogicalMinus) {
+            return visitSetOp((SetOp) node);
+        } else if (node instanceof FlinkLogicalTableFunctionScan
+                || node instanceof FlinkLogicalSnapshot
+                || node instanceof FlinkLogicalRank
+                || node instanceof FlinkLogicalDistribution
+                || node instanceof FlinkLogicalWatermarkAssigner
+                || node instanceof FlinkLogicalSort
+                || node instanceof FlinkLogicalOverAggregate
+                || node instanceof FlinkLogicalExpand) {
+            return visitSimpleRel(node);
+        } else if (node instanceof FlinkLogicalWindowAggregate) {
+            return visitWindowAggregate((FlinkLogicalWindowAggregate) node);
+        } else if (node instanceof FlinkLogicalWindowTableAggregate) {
+            FlinkLogicalWindowTableAggregate tableAgg = 
(FlinkLogicalWindowTableAggregate) node;
+            FlinkLogicalWindowAggregate correspondingAgg =
+                    new FlinkLogicalWindowAggregate(
+                            tableAgg.getCluster(),
+                            tableAgg.getTraitSet(),
+                            tableAgg.getInput(),
+                            tableAgg.getGroupSet(),
+                            tableAgg.getAggCallList(),
+                            tableAgg.getWindow(),
+                            tableAgg.getNamedProperties());
+            FlinkLogicalWindowAggregate convertedWindowAgg = 
visitWindowAggregate(correspondingAgg);
+            return new FlinkLogicalWindowTableAggregate(
+                    tableAgg.getCluster(),
+                    tableAgg.getTraitSet(),
+                    convertedWindowAgg.getInput(),
+                    tableAgg.getGroupSet(),
+                    tableAgg.getGroupSets(),
+                    convertedWindowAgg.getAggCallList(),
+                    tableAgg.getWindow(),
+                    tableAgg.getNamedProperties());
+        } else if (node instanceof FlinkLogicalAggregate) {
+            return visitAggregate((FlinkLogicalAggregate) node);
+        } else if (node instanceof FlinkLogicalTableAggregate) {
+            FlinkLogicalTableAggregate tableAgg = (FlinkLogicalTableAggregate) 
node;
+            FlinkLogicalAggregate correspondingAgg =
+                    FlinkLogicalAggregate.create(
+                            tableAgg.getInput(),
+                            tableAgg.getGroupSet(),
+                            tableAgg.getGroupSets(),
+                            tableAgg.getAggCallList());
+            FlinkLogicalAggregate convertedAgg = 
visitAggregate(correspondingAgg);
+            return new FlinkLogicalTableAggregate(
+                    tableAgg.getCluster(),
+                    tableAgg.getTraitSet(),
+                    convertedAgg.getInput(),
+                    convertedAgg.getGroupSet(),
+                    convertedAgg.getGroupSets(),
+                    convertedAgg.getAggCallList());
+        } else if (node instanceof FlinkLogicalMatch) {
+            return visitMatch((FlinkLogicalMatch) node);
+        } else if (node instanceof FlinkLogicalCalc) {
+            return visitCalc((FlinkLogicalCalc) node);
+        } else if (node instanceof FlinkLogicalCorrelate) {
+            return visitCorrelate((FlinkLogicalCorrelate) node);
+        } else if (node instanceof FlinkLogicalJoin) {
+            return visitJoin((FlinkLogicalJoin) node);
+        } else if (node instanceof FlinkLogicalSink) {
+            return visitSink((FlinkLogicalSink) node);
+        } else if (node instanceof FlinkLogicalLegacySink) {
+            return visitSink((FlinkLogicalLegacySink) node);
+        } else {
+            return visitInvalidRel(node);
+        }
+    }
+
+    @Override
+    public RelNode visit(TableScan scan) {
+        return scan;
+    }
+
+    @Override
+    public RelNode visit(TableFunctionScan scan) {
+        if (scan instanceof FlinkLogicalTableFunctionScan) {
+            return visitSimpleRel(scan);
+        } else {
+            return visitInvalidRel(scan);
+        }
+    }
+
+    @Override
+    public RelNode visit(LogicalValues values) {
+        return values;
+    }
+
+    @Override
+    public RelNode visit(LogicalFilter filter) {
+        return visitInvalidRel(filter);
+    }
+
+    @Override
+    public RelNode visit(LogicalCalc calc) {
+        return visitInvalidRel(calc);
+    }
+
+    @Override
+    public RelNode visit(LogicalProject project) {
+        return visitInvalidRel(project);
+    }
+
+    @Override
+    public RelNode visit(LogicalJoin join) {
+        return visitInvalidRel(join);
+    }
+
+    @Override
+    public RelNode visit(LogicalCorrelate correlate) {
+        return visitInvalidRel(correlate);
     }

Review comment:
       This class should extend from `RelHomogeneousShuttle` instead of 
`RelShuttle`, which could reduce a lot of code. these `public RelNode 
visit(LogicalXx xx) {}` methods can be removed.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -128,32 +155,165 @@ public static RelNode convert(
     }
 
     @Override
-    public RelNode visit(LogicalIntersect intersect) {
-        return visitSetOp(intersect);
+    public RelNode visit(RelNode node) {
+        if (node instanceof FlinkLogicalValues) {
+            return node;
+        } else if (node instanceof FlinkLogicalIntersect
+                || node instanceof FlinkLogicalUnion
+                || node instanceof FlinkLogicalMinus) {
+            return visitSetOp((SetOp) node);
+        } else if (node instanceof FlinkLogicalTableFunctionScan
+                || node instanceof FlinkLogicalSnapshot
+                || node instanceof FlinkLogicalRank
+                || node instanceof FlinkLogicalDistribution
+                || node instanceof FlinkLogicalWatermarkAssigner
+                || node instanceof FlinkLogicalSort
+                || node instanceof FlinkLogicalOverAggregate
+                || node instanceof FlinkLogicalExpand) {
+            return visitSimpleRel(node);
+        } else if (node instanceof FlinkLogicalWindowAggregate) {
+            return visitWindowAggregate((FlinkLogicalWindowAggregate) node);
+        } else if (node instanceof FlinkLogicalWindowTableAggregate) {
+            FlinkLogicalWindowTableAggregate tableAgg = 
(FlinkLogicalWindowTableAggregate) node;
+            FlinkLogicalWindowAggregate correspondingAgg =
+                    new FlinkLogicalWindowAggregate(
+                            tableAgg.getCluster(),
+                            tableAgg.getTraitSet(),
+                            tableAgg.getInput(),
+                            tableAgg.getGroupSet(),
+                            tableAgg.getAggCallList(),
+                            tableAgg.getWindow(),
+                            tableAgg.getNamedProperties());
+            FlinkLogicalWindowAggregate convertedWindowAgg = 
visitWindowAggregate(correspondingAgg);
+            return new FlinkLogicalWindowTableAggregate(
+                    tableAgg.getCluster(),
+                    tableAgg.getTraitSet(),
+                    convertedWindowAgg.getInput(),
+                    tableAgg.getGroupSet(),
+                    tableAgg.getGroupSets(),
+                    convertedWindowAgg.getAggCallList(),
+                    tableAgg.getWindow(),
+                    tableAgg.getNamedProperties());
+        } else if (node instanceof FlinkLogicalAggregate) {
+            return visitAggregate((FlinkLogicalAggregate) node);
+        } else if (node instanceof FlinkLogicalTableAggregate) {
+            FlinkLogicalTableAggregate tableAgg = (FlinkLogicalTableAggregate) 
node;
+            FlinkLogicalAggregate correspondingAgg =
+                    FlinkLogicalAggregate.create(
+                            tableAgg.getInput(),
+                            tableAgg.getGroupSet(),
+                            tableAgg.getGroupSets(),
+                            tableAgg.getAggCallList());
+            FlinkLogicalAggregate convertedAgg = 
visitAggregate(correspondingAgg);
+            return new FlinkLogicalTableAggregate(
+                    tableAgg.getCluster(),
+                    tableAgg.getTraitSet(),
+                    convertedAgg.getInput(),
+                    convertedAgg.getGroupSet(),
+                    convertedAgg.getGroupSets(),
+                    convertedAgg.getAggCallList());
+        } else if (node instanceof FlinkLogicalMatch) {
+            return visitMatch((FlinkLogicalMatch) node);
+        } else if (node instanceof FlinkLogicalCalc) {
+            return visitCalc((FlinkLogicalCalc) node);
+        } else if (node instanceof FlinkLogicalCorrelate) {
+            return visitCorrelate((FlinkLogicalCorrelate) node);
+        } else if (node instanceof FlinkLogicalJoin) {
+            return visitJoin((FlinkLogicalJoin) node);
+        } else if (node instanceof FlinkLogicalSink) {
+            return visitSink((FlinkLogicalSink) node);
+        } else if (node instanceof FlinkLogicalLegacySink) {
+            return visitSink((FlinkLogicalLegacySink) node);
+        } else {
+            return visitInvalidRel(node);
+        }
+    }
+
+    @Override
+    public RelNode visit(TableScan scan) {
+        return scan;
+    }
+
+    @Override
+    public RelNode visit(TableFunctionScan scan) {
+        if (scan instanceof FlinkLogicalTableFunctionScan) {
+            return visitSimpleRel(scan);
+        } else {
+            return visitInvalidRel(scan);
+        }
+    }
+
+    @Override
+    public RelNode visit(LogicalValues values) {
+        return values;
+    }
+
+    @Override
+    public RelNode visit(LogicalFilter filter) {
+        return visitInvalidRel(filter);
+    }
+
+    @Override
+    public RelNode visit(LogicalCalc calc) {
+        return visitInvalidRel(calc);
+    }
+
+    @Override
+    public RelNode visit(LogicalProject project) {
+        return visitInvalidRel(project);
+    }
+
+    @Override
+    public RelNode visit(LogicalJoin join) {
+        return visitInvalidRel(join);
+    }
+
+    @Override
+    public RelNode visit(LogicalCorrelate correlate) {
+        return visitInvalidRel(correlate);
     }

Review comment:
       This class should extend from `RelHomogeneousShuttle` instead of 
`RelShuttle`, which could reduce a lot of code. these `public RelNode 
visit(LogicalXx xx) {}` methods can be removed.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -128,32 +155,165 @@ public static RelNode convert(
     }
 
     @Override
-    public RelNode visit(LogicalIntersect intersect) {
-        return visitSetOp(intersect);
+    public RelNode visit(RelNode node) {
+        if (node instanceof FlinkLogicalValues) {
+            return node;
+        } else if (node instanceof FlinkLogicalIntersect
+                || node instanceof FlinkLogicalUnion
+                || node instanceof FlinkLogicalMinus) {
+            return visitSetOp((SetOp) node);
+        } else if (node instanceof FlinkLogicalTableFunctionScan
+                || node instanceof FlinkLogicalSnapshot
+                || node instanceof FlinkLogicalRank
+                || node instanceof FlinkLogicalDistribution
+                || node instanceof FlinkLogicalWatermarkAssigner
+                || node instanceof FlinkLogicalSort
+                || node instanceof FlinkLogicalOverAggregate
+                || node instanceof FlinkLogicalExpand) {
+            return visitSimpleRel(node);
+        } else if (node instanceof FlinkLogicalWindowAggregate) {
+            return visitWindowAggregate((FlinkLogicalWindowAggregate) node);
+        } else if (node instanceof FlinkLogicalWindowTableAggregate) {
+            FlinkLogicalWindowTableAggregate tableAgg = 
(FlinkLogicalWindowTableAggregate) node;
+            FlinkLogicalWindowAggregate correspondingAgg =
+                    new FlinkLogicalWindowAggregate(
+                            tableAgg.getCluster(),
+                            tableAgg.getTraitSet(),
+                            tableAgg.getInput(),
+                            tableAgg.getGroupSet(),
+                            tableAgg.getAggCallList(),
+                            tableAgg.getWindow(),
+                            tableAgg.getNamedProperties());
+            FlinkLogicalWindowAggregate convertedWindowAgg = 
visitWindowAggregate(correspondingAgg);
+            return new FlinkLogicalWindowTableAggregate(
+                    tableAgg.getCluster(),
+                    tableAgg.getTraitSet(),
+                    convertedWindowAgg.getInput(),
+                    tableAgg.getGroupSet(),
+                    tableAgg.getGroupSets(),
+                    convertedWindowAgg.getAggCallList(),
+                    tableAgg.getWindow(),
+                    tableAgg.getNamedProperties());
+        } else if (node instanceof FlinkLogicalAggregate) {
+            return visitAggregate((FlinkLogicalAggregate) node);
+        } else if (node instanceof FlinkLogicalTableAggregate) {
+            FlinkLogicalTableAggregate tableAgg = (FlinkLogicalTableAggregate) 
node;
+            FlinkLogicalAggregate correspondingAgg =
+                    FlinkLogicalAggregate.create(
+                            tableAgg.getInput(),
+                            tableAgg.getGroupSet(),
+                            tableAgg.getGroupSets(),
+                            tableAgg.getAggCallList());
+            FlinkLogicalAggregate convertedAgg = 
visitAggregate(correspondingAgg);
+            return new FlinkLogicalTableAggregate(
+                    tableAgg.getCluster(),
+                    tableAgg.getTraitSet(),
+                    convertedAgg.getInput(),
+                    convertedAgg.getGroupSet(),
+                    convertedAgg.getGroupSets(),
+                    convertedAgg.getAggCallList());

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -767,8 +920,11 @@ public RexNode visitCall(RexCall call) {
                         || updatedCallOp == 
FlinkSqlOperatorTable.SESSION_ROWTIME
                         || updatedCallOp == 
FlinkSqlOperatorTable.SESSION_PROCTIME
                         || updatedCallOp == 
FlinkSqlOperatorTable.MATCH_PROCTIME
-                        || updatedCallOp == FlinkSqlOperatorTable.PROCTIME
-                        || updatedCallOp == SqlStdOperatorTable.AS) {
+                        || updatedCallOp == FlinkSqlOperatorTable.PROCTIME) {
+                    return updatedCall;
+                } else if (updatedCallOp == SqlStdOperatorTable.AS
+                        || updatedCallOp == SqlStdOperatorTable.CAST
+                        || updatedCallOp == FlinkSqlOperatorTable.REINTERPRET) 
{

Review comment:
       merge them into one `if` branch

##########
File path: 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##########
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], rowNum=[$5])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
1:BIGINT AS rowNum])
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, 
1:BIGINT AS $5])

Review comment:
       please create a JIRA to trace the change ?

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
                 .collect(Collectors.toList());
     }
 
+    private FlinkLogicalWindowAggregate 
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {
+        RelNode newInput = convertAggInput(agg);
+        List<AggregateCall> updatedAggCalls = convertAggregateCalls(agg);
+        LogicalWindow oldWindow = agg.getWindow();
+        Seq<PlannerNamedWindowProperty> oldNamedProperties = 
agg.getNamedProperties();
+        FieldReferenceExpression oldTimeAttribute = 
agg.getWindow().timeAttribute();
+        LogicalType oldTimeAttributeType = 
oldTimeAttribute.getOutputDataType().getLogicalType();
+        boolean isRowtimeIndicator = 
LogicalTypeChecks.isRowtimeAttribute(oldTimeAttributeType);
+        boolean convertedToRowtimeTimestampLtz;
+        if (!isRowtimeIndicator) {
+            convertedToRowtimeTimestampLtz = false;
+        } else {
+            int timeIndicatorIdx = oldTimeAttribute.getFieldIndex();
+            RelDataType oldType =
+                    
agg.getInput().getRowType().getFieldList().get(timeIndicatorIdx).getType();
+            RelDataType newType =
+                    
newInput.getRowType().getFieldList().get(timeIndicatorIdx).getType();
+            convertedToRowtimeTimestampLtz =
+                    isTimestampLtzType(newType) && 
!isTimestampLtzType(oldType);
+        }
+        LogicalWindow newWindow;
+        Seq<PlannerNamedWindowProperty> newNamedProperties;
+        if (convertedToRowtimeTimestampLtz) {
+            // MATCH_ROWTIME may be converted from rowtime attribute to 
timestamp_ltz rowtime
+            // attribute, if time indicator of current window aggregate 
depends on input
+            // MATCH_ROWTIME, we should rewrite logicalWindow and 
namedProperties.
+            LogicalType newTimestampLtzType =
+                    new LocalZonedTimestampType(
+                            oldTimeAttributeType.isNullable(), 
TimestampKind.ROWTIME, 3);
+            FieldReferenceExpression newFieldRef =
+                    new FieldReferenceExpression(
+                            oldTimeAttribute.getName(),
+                            fromLogicalTypeToDataType(newTimestampLtzType),
+                            oldTimeAttribute.getInputIndex(),
+                            oldTimeAttribute.getFieldIndex());
+            PlannerWindowReference newAlias =
+                    new PlannerWindowReference(
+                            oldWindow.aliasAttribute().getName(), 
newTimestampLtzType);
+            if (oldWindow instanceof TumblingGroupWindow) {
+                TumblingGroupWindow window = (TumblingGroupWindow) oldWindow;
+                newWindow = new TumblingGroupWindow(newAlias, newFieldRef, 
window.size());
+            } else if (oldWindow instanceof SlidingGroupWindow) {
+                SlidingGroupWindow window = (SlidingGroupWindow) oldWindow;
+                newWindow =
+                        new SlidingGroupWindow(
+                                newAlias, newFieldRef, window.size(), 
window.slide());
+            } else if (oldWindow instanceof SessionGroupWindow) {
+                SessionGroupWindow window = (SessionGroupWindow) oldWindow;
+                newWindow = new SessionGroupWindow(newAlias, newFieldRef, 
window.gap());
+            } else {
+                throw new TableException(
+                        String.format(
+                                "This is a bug and should not happen. Please 
file an issue. Invalid window %s.",
+                                oldWindow.getClass().getSimpleName()));
+            }
+            List<PlannerNamedWindowProperty> newNamedPropertiesList =
+                    
JavaConverters.seqAsJavaListConverter(oldNamedProperties).asJava().stream()
+                            .map(
+                                    namedProperty -> {
+                                        if (namedProperty.getProperty()
+                                                instanceof 
PlannerRowtimeAttribute) {
+                                            return new 
PlannerNamedWindowProperty(
+                                                    namedProperty.getName(),
+                                                    new 
PlannerRowtimeAttribute(newAlias));
+                                        } else {
+                                            return namedProperty;
+                                        }
+                                    })
+                            .collect(Collectors.toList());
+            newNamedProperties =
+                    
JavaConverters.iterableAsScalaIterableConverter(newNamedPropertiesList)
+                            .asScala()
+                            .toSeq();
+        } else {
+            newWindow = oldWindow;
+            newNamedProperties = oldNamedProperties;
+        }
+        return new FlinkLogicalWindowAggregate(
+                agg.getCluster(),
+                agg.getTraitSet(),
+                newInput,
+                agg.getGroupSet(),
+                updatedAggCalls,
+                newWindow,
+                newNamedProperties);
+    }
+
+    private RelNode visitInvalidRel(RelNode node) {
+        throw new TableException(
+                String.format(
+                        "This is a bug and should not happen. Please file an 
issue. Unknown node %s.",
+                        node.getRelTypeName()));
+    }
+
     // 
----------------------------------------------------------------------------------------
     //                                       Utility
     // 
----------------------------------------------------------------------------------------
 
     private RelNode materializeProcTime(RelNode node) {
+        // If input is empty values, ignore materialize
+        if (node instanceof FlinkLogicalValues
+                && FlinkLogicalValues.isEmpty((FlinkLogicalValues) node)) {
+            return node;
+        }

Review comment:
       Does this really need ? the following code can handle `empty values`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -128,32 +155,165 @@ public static RelNode convert(
     }
 
     @Override
-    public RelNode visit(LogicalIntersect intersect) {
-        return visitSetOp(intersect);
+    public RelNode visit(RelNode node) {
+        if (node instanceof FlinkLogicalValues) {
+            return node;
+        } else if (node instanceof FlinkLogicalIntersect
+                || node instanceof FlinkLogicalUnion
+                || node instanceof FlinkLogicalMinus) {
+            return visitSetOp((SetOp) node);
+        } else if (node instanceof FlinkLogicalTableFunctionScan
+                || node instanceof FlinkLogicalSnapshot
+                || node instanceof FlinkLogicalRank
+                || node instanceof FlinkLogicalDistribution
+                || node instanceof FlinkLogicalWatermarkAssigner
+                || node instanceof FlinkLogicalSort
+                || node instanceof FlinkLogicalOverAggregate
+                || node instanceof FlinkLogicalExpand) {
+            return visitSimpleRel(node);
+        } else if (node instanceof FlinkLogicalWindowAggregate) {
+            return visitWindowAggregate((FlinkLogicalWindowAggregate) node);
+        } else if (node instanceof FlinkLogicalWindowTableAggregate) {
+            FlinkLogicalWindowTableAggregate tableAgg = 
(FlinkLogicalWindowTableAggregate) node;
+            FlinkLogicalWindowAggregate correspondingAgg =
+                    new FlinkLogicalWindowAggregate(
+                            tableAgg.getCluster(),
+                            tableAgg.getTraitSet(),
+                            tableAgg.getInput(),
+                            tableAgg.getGroupSet(),
+                            tableAgg.getAggCallList(),
+                            tableAgg.getWindow(),
+                            tableAgg.getNamedProperties());
+            FlinkLogicalWindowAggregate convertedWindowAgg = 
visitWindowAggregate(correspondingAgg);
+            return new FlinkLogicalWindowTableAggregate(
+                    tableAgg.getCluster(),
+                    tableAgg.getTraitSet(),
+                    convertedWindowAgg.getInput(),
+                    tableAgg.getGroupSet(),
+                    tableAgg.getGroupSets(),
+                    convertedWindowAgg.getAggCallList(),
+                    tableAgg.getWindow(),
+                    tableAgg.getNamedProperties());

Review comment:
       nit: extract these code into a method, which could make the `public 
RelNode visit(RelNode node) {}` method  easier to read

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
                 .collect(Collectors.toList());
     }
 
+    private FlinkLogicalWindowAggregate 
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {

Review comment:
       are there any cases which can cover the change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to