amaliujia commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r437064277
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
input);
String operatorName = ((RexCall) getCall()).getOperator().getName();
checkArgument(
- operatorName.equals("TUMBLE"),
- "Only support TUMBLE table-valued function. Current operator: %s",
+ TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+ "Only support %s table-valued functions. Current operator: %s",
+ TVFStreamingUtils.WINDOWING_TVF,
operatorName);
- RexCall call = ((RexCall) getCall());
- RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
- PCollection<Row> upstream = input.get(0);
- Schema outputSchema = CalciteUtils.toSchema(getRowType());
- FixedWindows windowFn =
FixedWindows.of(durationParameter(call.getOperands().get(2)));
- PCollection<Row> streamWithWindowMetadata =
- upstream
- .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(),
outputSchema)))
- .setRowSchema(outputSchema);
-
- PCollection<Row> windowedStream =
- assignTimestampsAndWindow(
- streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
- return windowedStream;
+ return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall)
getCall()), input.get(0));
Review comment:
I am planning to keep it for now. Because we will support user-defined
table-valued function in the near future, seems to me that such way will be
extensible for UDTVF.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
input);
String operatorName = ((RexCall) getCall()).getOperator().getName();
checkArgument(
- operatorName.equals("TUMBLE"),
- "Only support TUMBLE table-valued function. Current operator: %s",
+ TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+ "Only support %s table-valued functions. Current operator: %s",
+ TVFStreamingUtils.WINDOWING_TVF,
operatorName);
- RexCall call = ((RexCall) getCall());
- RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
- PCollection<Row> upstream = input.get(0);
- Schema outputSchema = CalciteUtils.toSchema(getRowType());
- FixedWindows windowFn =
FixedWindows.of(durationParameter(call.getOperands().get(2)));
- PCollection<Row> streamWithWindowMetadata =
- upstream
- .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(),
outputSchema)))
- .setRowSchema(outputSchema);
-
- PCollection<Row> windowedStream =
- assignTimestampsAndWindow(
- streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
- return windowedStream;
+ return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall)
getCall()), input.get(0));
Review comment:
If it turns that UDTVF will goes to different Rel or same Rel but
different code path, I will switch here back to 'switch case'
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFToPTransform.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+
+/** Provides a function that produces a PCollection based on TVF and upstream
PCollection. */
+public interface TVFToPTransform {
Review comment:
Done.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFSlidingWindowFn.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.sql.impl.utils.TVFStreamingUtils;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+
+/**
+ * TVFSlidingWindowFn assigns window based on input row's "window_start" and
"window_end"
+ * timestamps.
+ */
+public class TVFSlidingWindowFn extends NonMergingWindowFn<Object,
IntervalWindow> {
+ /** Amount of time between generated windows. */
+ private final Duration period;
+
+ /** Size of the generated windows. */
+ private final Duration size;
+
+ public static TVFSlidingWindowFn of(Duration size, Duration period) {
+ return new TVFSlidingWindowFn(size, period);
+ }
+
+ private TVFSlidingWindowFn(Duration size, Duration period) {
+ this.period = period;
+ this.size = size;
+ }
+
+ @Override
+ public Collection<IntervalWindow> assignWindows(AssignContext c) throws
Exception {
+ Row curRow = (Row) c.element();
+ // In sliding window as TVF syntax, each row contains's its window's start
and end as metadata,
+ // thus we can assign a window directly based on window's start and end
metadata.
+ return Arrays.asList(
+ new IntervalWindow(
+ curRow.getDateTime(TVFStreamingUtils.WINDOW_START).toInstant(),
+ curRow.getDateTime(TVFStreamingUtils.WINDOW_END).toInstant()));
+ }
+
+ @Override
+ public boolean isCompatible(WindowFn<?, ?> other) {
+ return equals(other);
+ }
+
+ @Override
+ public Coder<IntervalWindow> windowCoder() {
+ return IntervalWindow.getCoder();
+ }
+
+ @Override
+ public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
+ throw new UnsupportedOperationException(
+ "TVFSlidingWindow does not support side input windows.");
+ }
+
+ @Override
Review comment:
Done. Thanks!
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -85,6 +97,85 @@ public TableFunctionScan copy(
}
private class Transform extends PTransform<PCollectionList<Row>,
PCollection<Row>> {
+ private TVFToPTransform tumbleToPTransform =
+ (call, upstream) -> {
+ RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+ Schema outputSchema = CalciteUtils.toSchema(getRowType());
+ FixedWindows windowFn =
FixedWindows.of(durationParameter(call.getOperands().get(2)));
+ PCollection<Row> streamWithWindowMetadata =
+ upstream
+ .apply(ParDo.of(new FixedWindowDoFn(windowFn,
wmCol.getIndex(), outputSchema)))
+ .setRowSchema(outputSchema);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(
+ streamWithWindowMetadata, wmCol.getIndex(), (WindowFn)
windowFn);
+
+ return windowedStream;
+ };
+
+ private TVFToPTransform hopToPTransform =
+ (call, upstream) -> {
+ RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+ Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
+ Duration size = durationParameter(call.getOperands().get(2));
+ Duration period = durationParameter(call.getOperands().get(3));
+ SlidingWindows windowFn = SlidingWindows.of(size).every(period);
+ PCollection<Row> streamWithWindowMetadata =
+ upstream
+ .apply(ParDo.of(new SlidingWindowDoFn(windowFn,
wmCol.getIndex(), outputSchema)))
+ .setRowSchema(outputSchema);
+
+ // Sliding window needs this special WindowFn to assign windows
based on window_start,
+ // window_end metadata.
+ WindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(
+ streamWithWindowMetadata, wmCol.getIndex(), specialWindowFn);
+
+ return windowedStream;
+ };
+
+ private TVFToPTransform sessionToPTransform =
+ (call, upstream) -> {
+ RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+ Duration gap = durationParameter(call.getOperands().get(2));
+
+ Sessions sessions = Sessions.withGapDuration(gap);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(upstream, wmCol.getIndex(), sessions);
+
+ Schema outputSchema = CalciteUtils.toSchema(getRowType());
+ // To extract session's window metadata, we apply a GroupByKey with
a dummy key. It is
+ // because
+ // session is merging window. After GBK, SessionWindowDoFn will help
extract window_start,
+ // window_end metadata.
+ PCollection<Row> streamWithWindowMetadata =
+ windowedStream
+ .apply(WithKeys.of("dummy"))
Review comment:
I don't know if there is any documentation. This is indeed the way to
get SESSION_END (I found from our codebase).
For the scalability concern, note that GBK work on per-key and per-window
basis, so at least this GBK have a GROUP BY window to reduce hot keys.
Of course it still might be hot keys if there is a super large SESSION
window, I have logged https://jira.apache.org/jira/browse/CALCITE-4051 for
improvement idea.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
input);
String operatorName = ((RexCall) getCall()).getOperator().getName();
checkArgument(
- operatorName.equals("TUMBLE"),
- "Only support TUMBLE table-valued function. Current operator: %s",
+ TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+ "Only support %s table-valued functions. Current operator: %s",
+ TVFStreamingUtils.WINDOWING_TVF,
operatorName);
- RexCall call = ((RexCall) getCall());
- RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
- PCollection<Row> upstream = input.get(0);
- Schema outputSchema = CalciteUtils.toSchema(getRowType());
- FixedWindows windowFn =
FixedWindows.of(durationParameter(call.getOperands().get(2)));
- PCollection<Row> streamWithWindowMetadata =
- upstream
- .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(),
outputSchema)))
- .setRowSchema(outputSchema);
-
- PCollection<Row> windowedStream =
- assignTimestampsAndWindow(
- streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
- return windowedStream;
+ return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall)
getCall()), input.get(0));
}
/** Extract timestamps from the windowFieldIndex, then window into
windowFns. */
private PCollection<Row> assignTimestampsAndWindow(
- PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row,
IntervalWindow> windowFn) {
+ PCollection<Row> upstream,
+ int windowFieldIndex,
+ WindowFn<Object, IntervalWindow> windowFn) {
Review comment:
Cast session to WindowFn and now it is `Row`
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -206,7 +206,37 @@ private void addBuiltinFunctionsToCatalog(SimpleCatalog
catalog, AnalyzerOptions
// TUMBLE
catalog.addTableValuedFunction(
new
TableValuedFunction.ForwardInputSchemaToOutputSchemaWithAppendedColumnTVF(
- ImmutableList.of("TUMBLE"),
+ ImmutableList.of(TVFStreamingUtils.FIXED_WINDOW_TVF),
+ new FunctionSignature(
+ retType, ImmutableList.of(inputTableType, descriptorType,
stringType), 123),
Review comment:
Good point. I changed it back to `-1`. I have seen `-1`usage in internal
codebase.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -166,6 +247,54 @@ public void processElement(ProcessContext c) {
}
}
+ private static class SlidingWindowDoFn extends DoFn<Row, Row> {
+ private final int windowFieldIndex;
+ private final SlidingWindows windowFn;
+ private final Schema outputSchema;
+
+ public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex,
Schema schema) {
+ this.windowFn = windowFn;
+ this.windowFieldIndex = windowFieldIndex;
+ this.outputSchema = schema;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Row row = c.element();
+ Collection<IntervalWindow> windows =
+
windowFn.assignWindows(row.getDateTime(windowFieldIndex).toInstant());
+ for (IntervalWindow window : windows) {
+ Row.Builder builder = Row.withSchema(outputSchema);
+ builder.addValues(row.getValues());
+ builder.addValue(window.start());
+ builder.addValue(window.end());
+ c.output(builder.build());
+ }
+ }
+ }
+
+ private static class SessionWindowDoFn extends DoFn<KV<String,
Iterable<Row>>, Row> {
+ private final Schema outputSchema;
+
+ public SessionWindowDoFn(Schema schema) {
+ this.outputSchema = schema;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<String, Iterable<Row>> element, BoundedWindow window,
OutputReceiver<Row> out) {
+ IntervalWindow intervalWindow = (IntervalWindow) window;
+ for (Row cur : element.getValue()) {
Review comment:
See scalability comment above. Because GBK works on per-key and
per-window basis, and I used "dummy" key above, thus this ParDo will go through
every element in a session window. If that window is large, this could cause a
problem. (Hot key is always a concern for any GBK).
Logged https://jira.apache.org/jira/browse/CALCITE-4051 for future
improvement.
##########
File path:
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
+ @Test
Review comment:
Have moved all streaming tests to another file.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -85,6 +97,85 @@ public TableFunctionScan copy(
}
private class Transform extends PTransform<PCollectionList<Row>,
PCollection<Row>> {
+ private TVFToPTransform tumbleToPTransform =
+ (call, upstream) -> {
+ RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+ Schema outputSchema = CalciteUtils.toSchema(getRowType());
+ FixedWindows windowFn =
FixedWindows.of(durationParameter(call.getOperands().get(2)));
+ PCollection<Row> streamWithWindowMetadata =
+ upstream
+ .apply(ParDo.of(new FixedWindowDoFn(windowFn,
wmCol.getIndex(), outputSchema)))
+ .setRowSchema(outputSchema);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(
+ streamWithWindowMetadata, wmCol.getIndex(), (WindowFn)
windowFn);
+
+ return windowedStream;
+ };
+
+ private TVFToPTransform hopToPTransform =
+ (call, upstream) -> {
+ RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+ Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
+ Duration size = durationParameter(call.getOperands().get(2));
+ Duration period = durationParameter(call.getOperands().get(3));
+ SlidingWindows windowFn = SlidingWindows.of(size).every(period);
+ PCollection<Row> streamWithWindowMetadata =
+ upstream
+ .apply(ParDo.of(new SlidingWindowDoFn(windowFn,
wmCol.getIndex(), outputSchema)))
+ .setRowSchema(outputSchema);
+
+ // Sliding window needs this special WindowFn to assign windows
based on window_start,
+ // window_end metadata.
+ WindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(
+ streamWithWindowMetadata, wmCol.getIndex(), specialWindowFn);
+
+ return windowedStream;
+ };
+
+ private TVFToPTransform sessionToPTransform =
+ (call, upstream) -> {
+ RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+ Duration gap = durationParameter(call.getOperands().get(2));
+
+ Sessions sessions = Sessions.withGapDuration(gap);
+
+ PCollection<Row> windowedStream =
+ assignTimestampsAndWindow(upstream, wmCol.getIndex(), sessions);
+
+ Schema outputSchema = CalciteUtils.toSchema(getRowType());
+ // To extract session's window metadata, we apply a GroupByKey with
a dummy key. It is
+ // because
+ // session is merging window. After GBK, SessionWindowDoFn will help
extract window_start,
+ // window_end metadata.
+ PCollection<Row> streamWithWindowMetadata =
+ windowedStream
+ .apply(WithKeys.of("dummy"))
Review comment:
I don't know if there is any documentation. This is indeed the way to
get SESSION_END (I found from our codebase).
For the scalability concern, note that GBK work on per-key and per-window
basis, so at least this GBK have a GROUP BY window to reduce hot keys.
Of course it still might be hot keys if there is a super large SESSION
window, I have logged https://jira.apache.org/jira/browse/BEAM-10216 for
improvement idea.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
input);
String operatorName = ((RexCall) getCall()).getOperator().getName();
checkArgument(
- operatorName.equals("TUMBLE"),
- "Only support TUMBLE table-valued function. Current operator: %s",
+ TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
Review comment:
Good point!
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -166,6 +247,54 @@ public void processElement(ProcessContext c) {
}
}
+ private static class SlidingWindowDoFn extends DoFn<Row, Row> {
+ private final int windowFieldIndex;
+ private final SlidingWindows windowFn;
+ private final Schema outputSchema;
+
+ public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex,
Schema schema) {
+ this.windowFn = windowFn;
+ this.windowFieldIndex = windowFieldIndex;
+ this.outputSchema = schema;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Row row = c.element();
+ Collection<IntervalWindow> windows =
+
windowFn.assignWindows(row.getDateTime(windowFieldIndex).toInstant());
+ for (IntervalWindow window : windows) {
+ Row.Builder builder = Row.withSchema(outputSchema);
+ builder.addValues(row.getValues());
+ builder.addValue(window.start());
+ builder.addValue(window.end());
+ c.output(builder.build());
+ }
+ }
+ }
+
+ private static class SessionWindowDoFn extends DoFn<KV<String,
Iterable<Row>>, Row> {
+ private final Schema outputSchema;
+
+ public SessionWindowDoFn(Schema schema) {
+ this.outputSchema = schema;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<String, Iterable<Row>> element, BoundedWindow window,
OutputReceiver<Row> out) {
+ IntervalWindow intervalWindow = (IntervalWindow) window;
+ for (Row cur : element.getValue()) {
Review comment:
See scalability comment above. Because GBK works on per-key and
per-window basis, and I used "dummy" key above, thus this ParDo will go through
every element in a session window. If that window is large, this could cause a
problem. (Hot key is always a concern for any GBK).
Logged 299d596d873304b87db688fc953cf2d5b28139fe for future improvement.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -166,6 +247,54 @@ public void processElement(ProcessContext c) {
}
}
+ private static class SlidingWindowDoFn extends DoFn<Row, Row> {
+ private final int windowFieldIndex;
+ private final SlidingWindows windowFn;
+ private final Schema outputSchema;
+
+ public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex,
Schema schema) {
+ this.windowFn = windowFn;
+ this.windowFieldIndex = windowFieldIndex;
+ this.outputSchema = schema;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ Row row = c.element();
+ Collection<IntervalWindow> windows =
+
windowFn.assignWindows(row.getDateTime(windowFieldIndex).toInstant());
+ for (IntervalWindow window : windows) {
+ Row.Builder builder = Row.withSchema(outputSchema);
+ builder.addValues(row.getValues());
+ builder.addValue(window.start());
+ builder.addValue(window.end());
+ c.output(builder.build());
+ }
+ }
+ }
+
+ private static class SessionWindowDoFn extends DoFn<KV<String,
Iterable<Row>>, Row> {
+ private final Schema outputSchema;
+
+ public SessionWindowDoFn(Schema schema) {
+ this.outputSchema = schema;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<String, Iterable<Row>> element, BoundedWindow window,
OutputReceiver<Row> out) {
+ IntervalWindow intervalWindow = (IntervalWindow) window;
+ for (Row cur : element.getValue()) {
Review comment:
See scalability comment above. Because GBK works on per-key and
per-window basis, and I used "dummy" key above, thus this ParDo will go through
every element in a session window. If that window is large, this could cause a
problem. (Hot key is always a concern for any GBK).
Logged https://jira.apache.org/jira/browse/BEAM-10216 for future improvement.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]