[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6082 ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r193168591 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); --- End diff -- I've implemented a reflection version, which now supports record type within map/array. ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r193137905 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); --- End diff -- if the value is not primitive, say another record, how could we get the **TypeInformation extracted**? One solution is to get full class name of the map value type and then use reflection to get the class type of it and pass the class type to **convert(Class avroClass)**. Any better idea? ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r193132257 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { --- End diff -- could you explain more about this? I didn't find any coupling between AvroRecordClassConverter and AvroRow(De)SerializationSchema. But I did encounter "UTF8<->String" cast problem during my integration which I was not sure if I should open a separate issue. ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r193102162 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); + } else if (genericTypeInfo.getTypeClass() == List.class && + schema.getType() == Schema.Type.ARRAY) { --- End diff -- it is necessary because List.class doesn't mean the Schema.Type must be ARRAY. But I think it should be better use Schema.Type to do it. ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r193101536 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); --- End diff -- do you actually mean the value **might** not be primitive? ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r193092151 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); --- End diff -- This function requires "TypeInformation extracted, Schema schema" , but we can only get "org.apache.avro.Schema.Type" from Avro MapSchema (value type) and ArraySchema (element type). ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r193084082 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java --- @@ -128,6 +130,82 @@ public void testDifferentFieldsAvroClass() { source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType()); } + @Test + public void testHasMapFieldsAvroClass() { --- End diff -- The issue is exposed when using KafkaAvroTableSource, but moving the unit tests to AvroRowDeSerializationSchemaTest should be fine. ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user tragicjun commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r193039834 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); + } else if (genericTypeInfo.getTypeClass() == List.class && + schema.getType() == Schema.Type.ARRAY) { + return Types.LIST(convertPrimitiveType(schema.getElementType().getType())); --- End diff -- yes, org.apache.flink.table.api.Types doesn't support LIST, but org.apache.flink.api.common.typeinfo.Types does. The Avro array type would be converted to java List type. Can we add LIST in org.apache.flink.table.api.Types to support Avro arrays? ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r192771700 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); + } else if (genericTypeInfo.getTypeClass() == List.class && + schema.getType() == Schema.Type.ARRAY) { --- End diff -- Is this check necessary? If yes, why is it not necessary for Maps? ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r192770567 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); --- End diff -- The value must not be primitive. Call this function recursively instead? ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r192771502 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); + } else if (genericTypeInfo.getTypeClass() == List.class && + schema.getType() == Schema.Type.ARRAY) { + return Types.LIST(convertPrimitiveType(schema.getElementType().getType())); --- End diff -- Call this function recursively. Btw also update the method docs about this behavior. ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r192965275 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { --- End diff -- If you update this converter class, you should also update the corresponding runtime classes in `org.apache.flink.formats.avro.AvroRow(De)SerializationSchema` ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r192768664 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java --- @@ -128,6 +130,82 @@ public void testDifferentFieldsAvroClass() { source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType()); } + @Test + public void testHasMapFieldsAvroClass() { --- End diff -- I think we don't need changes in Kafka-related classes. This is an issue with the `AvroRowDeserializationSchema` and should be covered by the `AvroRowDeSerializationSchemaTest`. ---
[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/6082#discussion_r192955182 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java --- @@ -73,9 +75,37 @@ private AvroRecordClassConverter() { final GenericTypeInfo genericTypeInfo = (GenericTypeInfo) extracted; if (genericTypeInfo.getTypeClass() == Utf8.class) { return BasicTypeInfo.STRING_TYPE_INFO; + } else if (genericTypeInfo.getTypeClass() == Map.class) { + // avro map key is always string + return Types.MAP(Types.STRING, + convertPrimitiveType(schema.getValueType().getType())); + } else if (genericTypeInfo.getTypeClass() == List.class && + schema.getType() == Schema.Type.ARRAY) { + return Types.LIST(convertPrimitiveType(schema.getElementType().getType())); --- End diff -- I dont think Flink Table & SQL support LIST, please see org.apache.flink.table.api.Types. ---