[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6082


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread tragicjun
Github user tragicjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r193168591
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
--- End diff --

I've implemented a reflection version, which now supports record type 
within map/array.


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread tragicjun
Github user tragicjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r193137905
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
--- End diff --

if the value is not primitive, say another record, how could we get the 
**TypeInformation extracted**? One solution is to get full class name of the 
map value type and then use reflection to get the class type of it and pass the 
class type to **convert(Class avroClass)**. Any better idea?


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread tragicjun
Github user tragicjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r193132257
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
--- End diff --

could you explain more about this? I didn't find any coupling between 
AvroRecordClassConverter and AvroRow(De)SerializationSchema. But I did 
encounter "UTF8<->String" cast problem during my integration which I was not 
sure if I should open a separate issue.


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread tragicjun
Github user tragicjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r193102162
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
+   } else if (genericTypeInfo.getTypeClass() == List.class 
&&
+   schema.getType() == Schema.Type.ARRAY) {
--- End diff --

it is necessary because List.class doesn't mean the Schema.Type must be 
ARRAY. But I think it should be better use Schema.Type to do it.


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread tragicjun
Github user tragicjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r193101536
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
--- End diff --

do you actually mean the value **might** not be primitive?


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread tragicjun
Github user tragicjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r193092151
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
--- End diff --

This function requires "TypeInformation extracted, Schema schema" , but 
we can only get "org.apache.avro.Schema.Type" from Avro MapSchema (value type) 
and ArraySchema (element type). 


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread tragicjun
Github user tragicjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r193084082
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
 ---
@@ -128,6 +130,82 @@ public void testDifferentFieldsAvroClass() {

source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
}
 
+   @Test
+   public void testHasMapFieldsAvroClass() {
--- End diff --

The issue is exposed when using KafkaAvroTableSource, but moving the unit 
tests to AvroRowDeSerializationSchemaTest should be fine.


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread tragicjun
Github user tragicjun commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r193039834
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
+   } else if (genericTypeInfo.getTypeClass() == List.class 
&&
+   schema.getType() == Schema.Type.ARRAY) {
+   return 
Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
--- End diff --

yes, org.apache.flink.table.api.Types doesn't support LIST, but 
org.apache.flink.api.common.typeinfo.Types does. The Avro array type would be 
converted to java List type. Can we add LIST in 
org.apache.flink.table.api.Types to support Avro arrays?


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r192771700
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
+   } else if (genericTypeInfo.getTypeClass() == List.class 
&&
+   schema.getType() == Schema.Type.ARRAY) {
--- End diff --

Is this check necessary? If yes, why is it not necessary for Maps?


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r192770567
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
--- End diff --

The value must not be primitive. Call this function recursively instead?


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r192771502
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
+   } else if (genericTypeInfo.getTypeClass() == List.class 
&&
+   schema.getType() == Schema.Type.ARRAY) {
+   return 
Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
--- End diff --

Call this function recursively. Btw also update the method docs about this 
behavior.


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r192965275
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
--- End diff --

If you update this converter class, you should also update the 
corresponding runtime classes in 
`org.apache.flink.formats.avro.AvroRow(De)SerializationSchema`


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r192768664
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
 ---
@@ -128,6 +130,82 @@ public void testDifferentFieldsAvroClass() {

source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
}
 
+   @Test
+   public void testHasMapFieldsAvroClass() {
--- End diff --

I think we don't need changes in Kafka-related classes. This is an issue 
with the `AvroRowDeserializationSchema` and should be covered by the 
`AvroRowDeSerializationSchemaTest`.


---


[GitHub] flink pull request #6082: [FLINK-9444][table] KafkaAvroTableSource failed to...

2018-06-05 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6082#discussion_r192955182
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroRecordClassConverter.java
 ---
@@ -73,9 +75,37 @@ private AvroRecordClassConverter() {
final GenericTypeInfo genericTypeInfo = 
(GenericTypeInfo) extracted;
if (genericTypeInfo.getTypeClass() == Utf8.class) {
return BasicTypeInfo.STRING_TYPE_INFO;
+   } else if (genericTypeInfo.getTypeClass() == Map.class) 
{
+   // avro map key is always string
+   return Types.MAP(Types.STRING,
+   
convertPrimitiveType(schema.getValueType().getType()));
+   } else if (genericTypeInfo.getTypeClass() == List.class 
&&
+   schema.getType() == Schema.Type.ARRAY) {
+   return 
Types.LIST(convertPrimitiveType(schema.getElementType().getType()));
--- End diff --

I dont think Flink Table & SQL support LIST, please see 
org.apache.flink.table.api.Types.


---