This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d946302 [FLINK-25114][table-runtime] Remove flink-scala dependency and scala suffix d946302 is described below commit d9463022504a6bccad30d681c71f46658c073041 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Wed Dec 1 14:54:36 2021 +0100 [FLINK-25114][table-runtime] Remove flink-scala dependency and scala suffix This closes #18011. --- flink-architecture-tests/pom.xml | 2 +- flink-connectors/flink-connector-hive/pom.xml | 2 +- .../flink-avro-confluent-registry/pom.xml | 2 +- flink-python/pom.xml | 4 +- flink-table/flink-sql-client/pom.xml | 2 +- flink-table/flink-table-planner/pom.xml | 4 +- flink-table/flink-table-runtime/pom.xml | 10 +--- .../table/data/util/DataFormatConverters.java | 66 +++++++++++++++++++--- flink-table/flink-table-uber/pom.xml | 4 +- 9 files changed, 68 insertions(+), 28 deletions(-) diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml index 23330f9..d981444 100644 --- a/flink-architecture-tests/pom.xml +++ b/flink-architecture-tests/pom.xml @@ -116,7 +116,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 7b022ab..37f3dea 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -147,7 +147,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml index 59fc1d1..6dda873 100644 --- a/flink-formats/flink-avro-confluent-registry/pom.xml +++ b/flink-formats/flink-avro-confluent-registry/pom.xml @@ -113,7 +113,7 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> diff --git a/flink-python/pom.xml b/flink-python/pom.xml index 8007026..53d0acd 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -76,7 +76,7 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> @@ -190,7 +190,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml index 6254ae9..44ec8ad 100644 --- a/flink-table/flink-sql-client/pom.xml +++ b/flink-table/flink-sql-client/pom.xml @@ -86,7 +86,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${project.version}</version> </dependency> diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml index 26912eb..c971579 100644 --- a/flink-table/flink-table-planner/pom.xml +++ b/flink-table/flink-table-planner/pom.xml @@ -113,7 +113,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${project.version}</version> </dependency> @@ -269,7 +269,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> diff --git a/flink-table/flink-table-runtime/pom.xml b/flink-table/flink-table-runtime/pom.xml index badc73a..a6a6f9a 100644 --- a/flink-table/flink-table-runtime/pom.xml +++ b/flink-table/flink-table-runtime/pom.xml @@ -27,7 +27,7 @@ under the License. <relativePath>..</relativePath> </parent> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <name>Flink : Table : Runtime</name> <description> This module contains classes that are required by a task manager for @@ -79,14 +79,6 @@ under the License. </dependency> <dependency> - <!-- Provides the kryo serializer --> - <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> <version>${janino.version}</version> diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java index 4234183..3d9b3e0 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java @@ -70,8 +70,12 @@ import org.apache.flink.types.Row; import org.apache.commons.lang3.ArrayUtils; +import javax.annotation.Nullable; + import java.io.Serializable; import java.lang.reflect.Array; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.math.BigDecimal; import java.sql.Date; import java.sql.Time; @@ -86,8 +90,6 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Stream; -import scala.Product; - import static org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo; import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount; import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; @@ -291,10 +293,16 @@ public class DataFormatConverters { return new RowConverter(fieldTypes); } else if (Tuple.class.isAssignableFrom(clazz)) { return new TupleConverter((Class<Tuple>) clazz, fieldTypes); - } else if (Product.class.isAssignableFrom(clazz)) { + } else if (CaseClassConverter.PRODUCT_CLASS != null + && CaseClassConverter.PRODUCT_CLASS.isAssignableFrom(clazz)) { return new CaseClassConverter((TupleTypeInfoBase) compositeType, fieldTypes); - } else { + } else if (compositeType instanceof PojoTypeInfo) { return new PojoConverter((PojoTypeInfo) compositeType, fieldTypes); + } else { + throw new IllegalStateException( + "Cannot find a converter for type " + + compositeType + + ". If the target should be a converter to scala.Product, then you might have a scala classpath issue."); } case RAW: if (logicalType instanceof RawType) { @@ -1501,10 +1509,13 @@ public class DataFormatConverters { } /** Converter for case class. */ - public static final class CaseClassConverter extends AbstractRowDataConverter<Product> { + public static final class CaseClassConverter extends AbstractRowDataConverter<Object> { private static final long serialVersionUID = -966598627968372952L; + @Nullable private static final Class<?> PRODUCT_CLASS = getProductClass(); + @Nullable private static final Method PRODUCT_ELEMENT_METHOD = getProductElementMethod(); + private final TupleTypeInfoBase t; private final TupleSerializerBase serializer; @@ -1515,21 +1526,58 @@ public class DataFormatConverters { } @Override - RowData toInternalImpl(Product value) { + RowData toInternalImpl(Object value) { GenericRowData genericRow = new GenericRowData(t.getArity()); for (int i = 0; i < t.getArity(); i++) { - genericRow.setField(i, converters[i].toInternal(value.productElement(i))); + genericRow.setField(i, converters[i].toInternal(invokeProductElement(value, i))); } return genericRow; } @Override - Product toExternalImpl(RowData value) { + Object toExternalImpl(RowData value) { Object[] fields = new Object[t.getArity()]; for (int i = 0; i < t.getArity(); i++) { fields[i] = converters[i].toExternal(value, i); } - return (Product) serializer.createInstance(fields); + return serializer.createInstance(fields); + } + + private static Class<?> getProductClass() { + try { + return Class.forName( + "scala.Product", false, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + // Ignore, no scala available in the classpath + return null; + } + } + + private static Method getProductElementMethod() { + try { + if (PRODUCT_CLASS == null) { + return null; + } + return PRODUCT_CLASS.getMethod("productElement", int.class); + } catch (NoSuchMethodException e) { + throw new IllegalStateException( + "Cannot find scala.Product#productElement, has Scala changed its public API?", + e); + } + } + + private static Object invokeProductElement(Object value, int i) { + try { + if (PRODUCT_ELEMENT_METHOD == null) { + throw new IllegalStateException( + "PRODUCT_ELEMENT_METHOD is null, but it cannot be as this method should never be invoked if Scala is not in the classpath. Something is wrong with the classpath?"); + } + return PRODUCT_ELEMENT_METHOD.invoke(value, i); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException( + "Cannot execute scala.Product#productElement, has Scala changed its public API?", + e); + } } } diff --git a/flink-table/flink-table-uber/pom.xml b/flink-table/flink-table-uber/pom.xml index 9b985df..c51f8d2 100644 --- a/flink-table/flink-table-uber/pom.xml +++ b/flink-table/flink-table-uber/pom.xml @@ -80,7 +80,7 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -113,7 +113,7 @@ under the License. <include>org.apache.flink:flink-table-api-java-bridge</include> <include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include> <include>org.apache.flink:flink-table-planner_${scala.binary.version}</include> - <include>org.apache.flink:flink-table-runtime_${scala.binary.version}</include> + <include>org.apache.flink:flink-table-runtime</include> <include>org.apache.flink:flink-cep</include> </includes> </artifactSet>