This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f31770fcf57 [FLINK-33128] Add converter.open() method on TestValuesRuntimeFunctions f31770fcf57 is described below commit f31770fcf5769052f1ac32a6529de979eaf339a4 Author: Jerome Gagnon <jgag...@wealthsimple.com> AuthorDate: Fri Sep 22 14:35:19 2023 -0400 [FLINK-33128] Add converter.open() method on TestValuesRuntimeFunctions This closes #23453. --- .../table/planner/factories/TestValuesRuntimeFunctions.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index a381d573c41..3ab136d451f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.connector.RuntimeConverter; import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.data.GenericRowData; @@ -631,11 +632,11 @@ final class TestValuesRuntimeFunctions { public void open(FunctionContext context) throws Exception { RESOURCE_COUNTER.incrementAndGet(); isOpenCalled = true; + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (projectable) { - projection = - generatedProjection.newInstance( - Thread.currentThread().getContextClassLoader()); + projection = generatedProjection.newInstance(classLoader); } + converter.open(RuntimeConverter.Context.create(classLoader)); rowSerializer = InternalSerializers.create(producedRowType); indexDataByKey(); } @@ -725,11 +726,11 @@ final class TestValuesRuntimeFunctions { @Override public void open(FunctionContext context) throws Exception { RESOURCE_COUNTER.incrementAndGet(); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (projectable) { - projection = - generatedProjection.newInstance( - Thread.currentThread().getContextClassLoader()); + projection = generatedProjection.newInstance(classLoader); } + converter.open(RuntimeConverter.Context.create(classLoader)); rowSerializer = InternalSerializers.create(producedRowType); isOpenCalled = true; // generate unordered result for async lookup