This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 957018c [FLINK-22811][format-avro] Drop usages of legacy planner in Avro module 957018c is described below commit 957018cfd3a61507a2b100438fbee5939abe50e1 Author: Timo Walther <twal...@apache.org> AuthorDate: Mon May 31 16:22:56 2021 +0200 [FLINK-22811][format-avro] Drop usages of legacy planner in Avro module This closes #16032. --- flink-formats/flink-avro/pom.xml | 62 ++++++++-------------- .../flink/table/runtime/batch/AvroTypesITCase.java | 62 ++++++++-------------- 2 files changed, 44 insertions(+), 80 deletions(-) diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml index 06625bd..50c7ec8 100644 --- a/flink-formats/flink-avro/pom.xml +++ b/flink-formats/flink-avro/pom.xml @@ -36,7 +36,7 @@ under the License. <dependencies> - <!-- core dependencies --> + <!-- Core --> <dependency> <groupId>org.apache.flink</groupId> @@ -45,6 +45,19 @@ under the License. <scope>provided</scope> </dependency> + <!-- Table ecosystem --> + + <!-- Projects depending on this project won't depend on flink-table-*. --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <optional>true</optional> + </dependency> + + <!-- Avro --> + <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> @@ -62,80 +75,49 @@ under the License. <optional>true</optional> </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-common</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-common</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> + <!-- Tests --> - <!-- TODO This could be dropped if we move the Table Avro IT Case --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> - <!-- Avro RowData schema test dependency --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> - <!-- TODO This could be dropped if we move the Table Avro IT Case --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <artifactId>flink-core</artifactId> <version>${project.version}</version> <scope>test</scope> <type>test-jar</type> </dependency> + <!-- We need this for the patched FlinkScalaKryoInstantiator --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_${scala.binary.version}</artifactId> + <artifactId>flink-runtime_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> + <artifactId>flink-table-common</artifactId> <version>${project.version}</version> <scope>test</scope> <type>test-jar</type> </dependency> - <!-- We need this for the patched FlinkScalaKryoInstantiator --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java index b8df7d9..e28d3dc 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java @@ -27,20 +27,17 @@ import org.apache.flink.formats.avro.generated.User; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Expressions; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; import org.apache.avro.util.Utf8; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -59,8 +56,7 @@ import static org.apache.flink.table.api.Expressions.$; import static org.junit.Assert.assertEquals; /** Tests for interoperability with Avro types. */ -@RunWith(Parameterized.class) -public class AvroTypesITCase extends TableProgramsClusterTestBase { +public class AvroTypesITCase extends AbstractTestBase { private static final User USER_1 = User.newBuilder() @@ -160,16 +156,10 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase { new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) .build(); - public AvroTypesITCase(TestExecutionMode executionMode, TableConfigMode tableConfigMode) { - super(executionMode, tableConfigMode); - } - @Test public void testAvroToRow() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = - StreamTableEnvironment.create( - env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<User> ds = testData(env); Table t = tEnv.fromDataStream(ds, selectFields(ds)); @@ -178,33 +168,29 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase { List<Row> results = CollectionUtil.iteratorToList( DataStreamUtils.collect(tEnv.toAppendStream(result, Row.class))); + // TODO we should get an Avro record here instead of a nested row. + // This should be fixed with FLIP-136 String expected = - "black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," - + "2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," - + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,00:00:00.123456," - + "12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null\n" - + "blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," - + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," - + - // TODO we should get an Avro record here instead of a nested row. This - // should be fixed - // with FLIP-136 - "Berlin,42,Berlin,Bakerstreet,12049,null,null,00:00:00.123456,12:12:12,1970-01-01T00:00:00.123456Z," - + "2014-03-01T12:12:12.321Z,null\n" - + "yellow,null,Terminator,[false],[world],false," - + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," - + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," - + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,00:00:00.123456," - + "12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null"; + "+I[black, null, Whatever, [true], [hello], true, java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], " + + "2014-03-01, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48], 0.0, GREEN, " + + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 42, {}, null, null, null, 00:00:00.123456, " + + "12:12:12, 1970-01-01T00:00:00.123456Z, 2014-03-01T12:12:12.321Z, null]\n" + + "+I[blue, null, Charlie, [], [], false, java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, " + + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48], 1.337, RED, null, 1337, {}, " + + "+I[Berlin, 42, Berlin, Bakerstreet, 12049], null, null, 00:00:00.123456, 12:12:12, 1970-01-01T00:00:00.123456Z, " + + "2014-03-01T12:12:12.321Z, null]\n" + + "+I[yellow, null, Terminator, [false], [world], false, " + + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, " + + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48], 0.0, GREEN, " + + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 1, {}, null, null, null, 00:00:00.123456, " + + "12:12:12, 1970-01-01T00:00:00.123456Z, 2014-03-01T12:12:12.321Z, null]"; TestBaseUtils.compareResultAsText(results, expected); } @Test - public void testAvroStringAccess() throws Exception { + public void testAvroStringAccess() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = - StreamTableEnvironment.create( - env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<User> ds = testData(env); Table t = tEnv.fromDataStream(ds, selectFields(ds)); @@ -221,9 +207,7 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase { @Test public void testAvroObjectAccess() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = - StreamTableEnvironment.create( - env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<User> ds = testData(env); Table t = tEnv.fromDataStream(ds, selectFields(ds)); @@ -242,9 +226,7 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase { @Test public void testAvroToAvro() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = - StreamTableEnvironment.create( - env, EnvironmentSettings.newInstance().useBlinkPlanner().build()); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<User> ds = testData(env); Table t = tEnv.fromDataStream(ds, selectFields(ds));