twalthr commented on a change in pull request #13373:
URL: https://github.com/apache/flink/pull/13373#discussion_r487029161



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
##########
@@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> 
flatComparators) {
         */
        public final Object accessField(Field field, Object object) {
                try {
-                       object = field.get(object);
+                       if (field.isAccessible()) {

Review comment:
       wouldn't it be easier (and also more performant) to simply make the 
field accessible in the constructor of the comparator? Going through all 
methods for every field access is not very beautiful.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
##########
@@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, 
TypeInformation<?> info, Object ob
                switch (schema.getType()) {
                        case RECORD:
                                if (object instanceof IndexedRecord) {
-                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object, jodaConverter);

Review comment:
       we can simply access the member field instead of passing it through the 
methods, no?

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
##########
@@ -101,7 +106,7 @@ private static AvroToRowDataConverter 
createNullableConverter(LogicalType type)
        /**
         * Creates a runtime converter which assuming input object is not null.
         */
-       private static AvroToRowDataConverter createConverter(LogicalType type) 
{
+       private static AvroToRowDataConverter createConverter(LogicalType type, 
@Nullable JodaConverter jodaConverter) {

Review comment:
       We can simply access JodaConverter where needed as a singleton? I would 
not pollute this methods.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
##########
@@ -340,10 +357,22 @@ private Time convertToTime(Object object, @Nullable 
JodaConverter jodaConverter)
                return new Time(millis - LOCAL_TZ.getOffset(millis));
        }
 
-       private Timestamp convertToTimestamp(Object object) {
+       private Timestamp convertToTimestamp(Object object, @Nullable 
JodaConverter jodaConverter, boolean isMicros) {
                final long millis;
                if (object instanceof Long) {
-                       millis = (Long) object;
+                       if (isMicros) {
+                               long micros = (Long) object;
+                               Instant instant = Instant.ofEpochSecond(0)

Review comment:
       maybe to much optimization but should we just do the long arithmetic 
ourselves here and below? We are creating a lot of objects for the hot path.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) {
                                return DataTypes.TIMESTAMP(3)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
-                       }
-                       if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
                                return DataTypes.TIMESTAMP(6)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
+                               return DataTypes.TIME(6)
+                                       .bridgedTo(LocalTime.class)

Review comment:
       Shouldn't this be a SQL type like the others? Is this method actually 
used?

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
##########
@@ -193,7 +193,7 @@ private void validateUserSchema(DataType actual) {
                                DataTypes.FIELD("type_bytes", 
DataTypes.ARRAY(DataTypes.TINYINT().bridgedTo(Byte.class)).notNull()),
                                DataTypes.FIELD("type_date", 
DataTypes.DATE().bridgedTo(java.sql.Date.class).notNull()),
                                DataTypes.FIELD("type_time_millis", 
DataTypes.TIME().bridgedTo(java.sql.Time.class).notNull()),
-                               DataTypes.FIELD("type_time_micros", 
DataTypes.INT().notNull()),
+                               DataTypes.FIELD("type_time_micros", 
DataTypes.TIME(6).notNull()),

Review comment:
       same question as above why is this not `sql.Time`?

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       But it would still be nice to test all types for Avro but maybe to the 
new Blink planner. Could we add least have a separate commit for this class? 
Then we can revert the changes because after FLIP-136 this test should 
definitely work again, no? Or does it work already today?

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Thanks for the explanation.

##########
File path: flink-formats/pom.xml
##########
@@ -81,6 +81,7 @@ under the License.
                        <modules>
                                <module>flink-sql-orc</module>
                                <module>flink-sql-parquet</module>
+                               <module>flink-sql-avro</module>

Review comment:
       The `org.apache.flink.tests.util.kafka.SQLClientKafkaITCase` should not 
need to download the dependencies anymore.

##########
File path: flink-formats/flink-sql-avro/pom.xml
##########
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-formats</artifactId>
+               <version>1.12-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-sql-avro</artifactId>
+       <name>Flink : Formats : SQL Avro</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       <artifactSet>
+                                                               <includes>
+                                                                       
<include>org.apache.flink:flink-avro</include>
+                                                                       
<include>org.apache.avro:avro</include>
+                                                                       
<include>com.fasterxml.jackson.core:jackson-core</include>
+                                                                       
<include>com.fasterxml.jackson.core:jackson-databind</include>
+                                                                       
<include>com.fasterxml.jackson.core:jackson-annotations</include>
+                                                                       
<include>org.apache.commons:commons-compress</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations>
+                                                               <relocation>
+                                                                       
<pattern>com.fasterxml.jackson</pattern>
+                                                                       
<shadedPattern>org.apache.flink.com.fasterxml.jackson</shadedPattern>

Review comment:
       We have a shading pattern in Flink like:
   `org.apache.flink.elasticsearch6.shaded.org.apache.commons`

##########
File path: flink-formats/pom.xml
##########
@@ -81,6 +81,7 @@ under the License.
                        <modules>
                                <module>flink-sql-orc</module>
                                <module>flink-sql-parquet</module>
+                               <module>flink-sql-avro</module>

Review comment:
       Pleas also update https://flink.apache.org/downloads.html (Optional 
components)
   And 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
 (the Download link and the mentioning of Hadoop 😔).

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
##########
@@ -176,14 +181,25 @@ public void getFlatComparator(List<TypeComparator> 
flatComparators) {
         */
        public final Object accessField(Field field, Object object) {
                try {
-                       object = field.get(object);
+                       if (field.isAccessible()) {

Review comment:
       wouldn't it be easier (and also more performant) to simply make the 
field accessible in the constructor of the comparator? Going through all 
methods for every field access is not very beautiful.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
##########
@@ -218,7 +231,7 @@ private Object convertAvroType(Schema schema, 
TypeInformation<?> info, Object ob
                switch (schema.getType()) {
                        case RECORD:
                                if (object instanceof IndexedRecord) {
-                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+                                       return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object, jodaConverter);

Review comment:
       we can simply access the member field instead of passing it through the 
methods, no?

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
##########
@@ -101,7 +106,7 @@ private static AvroToRowDataConverter 
createNullableConverter(LogicalType type)
        /**
         * Creates a runtime converter which assuming input object is not null.
         */
-       private static AvroToRowDataConverter createConverter(LogicalType type) 
{
+       private static AvroToRowDataConverter createConverter(LogicalType type, 
@Nullable JodaConverter jodaConverter) {

Review comment:
       We can simply access JodaConverter where needed as a singleton? I would 
not pollute this methods.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
##########
@@ -340,10 +357,22 @@ private Time convertToTime(Object object, @Nullable 
JodaConverter jodaConverter)
                return new Time(millis - LOCAL_TZ.getOffset(millis));
        }
 
-       private Timestamp convertToTimestamp(Object object) {
+       private Timestamp convertToTimestamp(Object object, @Nullable 
JodaConverter jodaConverter, boolean isMicros) {
                final long millis;
                if (object instanceof Long) {
-                       millis = (Long) object;
+                       if (isMicros) {
+                               long micros = (Long) object;
+                               Instant instant = Instant.ofEpochSecond(0)

Review comment:
       maybe to much optimization but should we just do the long arithmetic 
ourselves here and below? We are creating a lot of objects for the hot path.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
##########
@@ -272,11 +276,14 @@ private static DataType convertToDataType(Schema schema) {
                                return DataTypes.TIMESTAMP(3)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
-                       }
-                       if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
                                return DataTypes.TIMESTAMP(6)
                                                
.bridgedTo(java.sql.Timestamp.class)
                                                .notNull();
+                       } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
+                               return DataTypes.TIME(6)
+                                       .bridgedTo(LocalTime.class)

Review comment:
       Shouldn't this be a SQL type like the others? Is this method actually 
used?

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
##########
@@ -193,7 +193,7 @@ private void validateUserSchema(DataType actual) {
                                DataTypes.FIELD("type_bytes", 
DataTypes.ARRAY(DataTypes.TINYINT().bridgedTo(Byte.class)).notNull()),
                                DataTypes.FIELD("type_date", 
DataTypes.DATE().bridgedTo(java.sql.Date.class).notNull()),
                                DataTypes.FIELD("type_time_millis", 
DataTypes.TIME().bridgedTo(java.sql.Time.class).notNull()),
-                               DataTypes.FIELD("type_time_micros", 
DataTypes.INT().notNull()),
+                               DataTypes.FIELD("type_time_micros", 
DataTypes.TIME(6).notNull()),

Review comment:
       same question as above why is this not `sql.Time`?

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       But it would still be nice to test all types for Avro but maybe to the 
new Blink planner. Could we add least have a separate commit for this class? 
Then we can revert the changes because after FLIP-136 this test should 
definitely work again, no? Or does it work already today?

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
##########
@@ -58,7 +55,7 @@
 @RunWith(Parameterized.class)
 public class AvroTypesITCase extends TableProgramsClusterTestBase {
 
-       private static final User USER_1 = User.newBuilder()
+       private static final SimpleUser USER_1 = SimpleUser.newBuilder()

Review comment:
       Thanks for the explanation.

##########
File path: flink-formats/pom.xml
##########
@@ -81,6 +81,7 @@ under the License.
                        <modules>
                                <module>flink-sql-orc</module>
                                <module>flink-sql-parquet</module>
+                               <module>flink-sql-avro</module>

Review comment:
       The `org.apache.flink.tests.util.kafka.SQLClientKafkaITCase` should not 
need to download the dependencies anymore.

##########
File path: flink-formats/flink-sql-avro/pom.xml
##########
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+       
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-formats</artifactId>
+               <version>1.12-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-sql-avro</artifactId>
+       <name>Flink : Formats : SQL Avro</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       <artifactSet>
+                                                               <includes>
+                                                                       
<include>org.apache.flink:flink-avro</include>
+                                                                       
<include>org.apache.avro:avro</include>
+                                                                       
<include>com.fasterxml.jackson.core:jackson-core</include>
+                                                                       
<include>com.fasterxml.jackson.core:jackson-databind</include>
+                                                                       
<include>com.fasterxml.jackson.core:jackson-annotations</include>
+                                                                       
<include>org.apache.commons:commons-compress</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                                       <relocations>
+                                                               <relocation>
+                                                                       
<pattern>com.fasterxml.jackson</pattern>
+                                                                       
<shadedPattern>org.apache.flink.com.fasterxml.jackson</shadedPattern>

Review comment:
       We have a shading pattern in Flink like:
   `org.apache.flink.elasticsearch6.shaded.org.apache.commons`

##########
File path: flink-formats/pom.xml
##########
@@ -81,6 +81,7 @@ under the License.
                        <modules>
                                <module>flink-sql-orc</module>
                                <module>flink-sql-parquet</module>
+                               <module>flink-sql-avro</module>

Review comment:
       Pleas also update https://flink.apache.org/downloads.html (Optional 
components)
   And 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
 (the Download link and the mentioning of Hadoop 😔).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to