This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 957018c  [FLINK-22811][format-avro] Drop usages of legacy planner in 
Avro module
957018c is described below

commit 957018cfd3a61507a2b100438fbee5939abe50e1
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon May 31 16:22:56 2021 +0200

    [FLINK-22811][format-avro] Drop usages of legacy planner in Avro module
    
    This closes #16032.
---
 flink-formats/flink-avro/pom.xml                   | 62 ++++++++--------------
 .../flink/table/runtime/batch/AvroTypesITCase.java | 62 ++++++++--------------
 2 files changed, 44 insertions(+), 80 deletions(-)

diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 06625bd..50c7ec8 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -36,7 +36,7 @@ under the License.
 
        <dependencies>
 
-               <!-- core dependencies -->
+               <!-- Core -->
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -45,6 +45,19 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <!-- Table ecosystem -->
+
+               <!-- Projects depending on this project won't depend on 
flink-table-*. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <optional>true</optional>
+               </dependency>
+
+               <!-- Avro -->
+
                <dependency>
                        <groupId>org.apache.avro</groupId>
                        <artifactId>avro</artifactId>
@@ -62,80 +75,49 @@ under the License.
                        <optional>true</optional>
                </dependency>
 
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-common</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <!-- test dependencies -->
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-common</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-               </dependency>
+               <!-- Tests -->
 
-               <!-- TODO This could be dropped if we move the Table Avro IT 
Case -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
 
-               <!-- Avro RowData schema test dependency -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
 
-               <!-- TODO This could be dropped if we move the Table Avro IT 
Case -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <artifactId>flink-core</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                        <type>test-jar</type>
                </dependency>
 
+               <!-- We need this for the patched FlinkScalaKryoInstantiator -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
+                       <artifactId>flink-table-common</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                        <type>test-jar</type>
                </dependency>
 
-               <!-- We need this for the patched FlinkScalaKryoInstantiator -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index b8df7d9..e28d3dc 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -27,20 +27,17 @@ import org.apache.flink.formats.avro.generated.User;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Expressions;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
 import org.apache.avro.util.Utf8;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -59,8 +56,7 @@ import static org.apache.flink.table.api.Expressions.$;
 import static org.junit.Assert.assertEquals;
 
 /** Tests for interoperability with Avro types. */
-@RunWith(Parameterized.class)
-public class AvroTypesITCase extends TableProgramsClusterTestBase {
+public class AvroTypesITCase extends AbstractTestBase {
 
     private static final User USER_1 =
             User.newBuilder()
@@ -160,16 +156,10 @@ public class AvroTypesITCase extends 
TableProgramsClusterTestBase {
                             new Fixed2(BigDecimal.valueOf(2000, 
2).unscaledValue().toByteArray()))
                     .build();
 
-    public AvroTypesITCase(TestExecutionMode executionMode, TableConfigMode 
tableConfigMode) {
-        super(executionMode, tableConfigMode);
-    }
-
     @Test
     public void testAvroToRow() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        StreamTableEnvironment tEnv =
-                StreamTableEnvironment.create(
-                        env, 
EnvironmentSettings.newInstance().useBlinkPlanner().build());
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         DataStream<User> ds = testData(env);
         Table t = tEnv.fromDataStream(ds, selectFields(ds));
@@ -178,33 +168,29 @@ public class AvroTypesITCase extends 
TableProgramsClusterTestBase {
         List<Row> results =
                 CollectionUtil.iteratorToList(
                         DataStreamUtils.collect(tEnv.toAppendStream(result, 
Row.class)));
+        // TODO we should get an Avro record here instead of a nested row.
+        //  This should be fixed with FLIP-136
         String expected =
-                
"black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 
cap=10],"
-                        + "2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 
cap=2],[7, -48],0.0,GREEN,"
-                        + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0],42,{},null,null,null,00:00:00.123456,"
-                        + 
"12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null\n"
-                        + 
"blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 
cap=10],2014-03-01,"
-                        + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, 
-48],1.337,RED,null,1337,{},"
-                        +
-                        // TODO we should get an Avro record here instead of a 
nested row. This
-                        // should be fixed
-                        // with FLIP-136
-                        
"Berlin,42,Berlin,Bakerstreet,12049,null,null,00:00:00.123456,12:12:12,1970-01-01T00:00:00.123456Z,"
-                        + "2014-03-01T12:12:12.321Z,null\n"
-                        + "yellow,null,Terminator,[false],[world],false,"
-                        + "java.nio.HeapByteBuffer[pos=0 lim=10 
cap=10],2014-03-01,"
-                        + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, 
-48],0.0,GREEN,"
-                        + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0],1,{},null,null,null,00:00:00.123456,"
-                        + 
"12:12:12,1970-01-01T00:00:00.123456Z,2014-03-01T12:12:12.321Z,null";
+                "+I[black, null, Whatever, [true], [hello], true, 
java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], "
+                        + "2014-03-01, java.nio.HeapByteBuffer[pos=0 lim=2 
cap=2], [7, -48], 0.0, GREEN, "
+                        + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 
42, {}, null, null, null, 00:00:00.123456, "
+                        + "12:12:12, 1970-01-01T00:00:00.123456Z, 
2014-03-01T12:12:12.321Z, null]\n"
+                        + "+I[blue, null, Charlie, [], [], false, 
java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, "
+                        + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, 
-48], 1.337, RED, null, 1337, {}, "
+                        + "+I[Berlin, 42, Berlin, Bakerstreet, 12049], null, 
null, 00:00:00.123456, 12:12:12, 1970-01-01T00:00:00.123456Z, "
+                        + "2014-03-01T12:12:12.321Z, null]\n"
+                        + "+I[yellow, null, Terminator, [false], [world], 
false, "
+                        + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 
2014-03-01, "
+                        + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, 
-48], 0.0, GREEN, "
+                        + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], 
1, {}, null, null, null, 00:00:00.123456, "
+                        + "12:12:12, 1970-01-01T00:00:00.123456Z, 
2014-03-01T12:12:12.321Z, null]";
         TestBaseUtils.compareResultAsText(results, expected);
     }
 
     @Test
-    public void testAvroStringAccess() throws Exception {
+    public void testAvroStringAccess() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        StreamTableEnvironment tEnv =
-                StreamTableEnvironment.create(
-                        env, 
EnvironmentSettings.newInstance().useBlinkPlanner().build());
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         DataStream<User> ds = testData(env);
         Table t = tEnv.fromDataStream(ds, selectFields(ds));
@@ -221,9 +207,7 @@ public class AvroTypesITCase extends 
TableProgramsClusterTestBase {
     @Test
     public void testAvroObjectAccess() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        StreamTableEnvironment tEnv =
-                StreamTableEnvironment.create(
-                        env, 
EnvironmentSettings.newInstance().useBlinkPlanner().build());
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         DataStream<User> ds = testData(env);
         Table t = tEnv.fromDataStream(ds, selectFields(ds));
@@ -242,9 +226,7 @@ public class AvroTypesITCase extends 
TableProgramsClusterTestBase {
     @Test
     public void testAvroToAvro() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        StreamTableEnvironment tEnv =
-                StreamTableEnvironment.create(
-                        env, 
EnvironmentSettings.newInstance().useBlinkPlanner().build());
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         DataStream<User> ds = testData(env);
         Table t = tEnv.fromDataStream(ds, selectFields(ds));

Reply via email to