I want to register the result-schema in a schema registry, as I am pushing the 
result-data to a Kafka topic. The result-schema is not known at compile-time, 
so I need to find a way to compute it at runtime from the resulting Flink 
Schema.

-Theo

(resent - again sorry, I forgot to add the others in the cc)

> On 9. Nov 2022, at 14:59, Andrew Otto <o...@wikimedia.org> wrote:
> 
> >  I want to convert the schema of a Flink table to both Protobuf schema and 
> > JSON schema
> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.  That 
> would indeed be something that is not usually done.  Just curious, why do you 
> want to do this?
> 
> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto <o...@wikimedia.org 
> <mailto:o...@wikimedia.org>> wrote:
> Hello! 
> 
> I see you are talking about JSONSchema, not just JSON itself.
> 
> We're trying to do a similar thing at Wikimedia and have developed some 
> tooling around this.  
> 
> JsonSchemaFlinkConverter 
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
>  has some logic to convert from JSONSchema Jackson ObjectNodes to Flink Table 
> DataType or Table SchemaBuilder, or Flink DataStream TypeInformation[Row].  
> Some of the conversions from JSONSchema to Flink type are opinionated.  You 
> can see the mappings here 
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>.
> 
> 
> 
> 
> 
> 
> 
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker <theo.wueb...@inside-m2m.de 
> <mailto:theo.wueb...@inside-m2m.de>> wrote:
> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to 
> what you pointed out:
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = 
> AvroSchemaConverter.convertToSchema(type.getLogicalType());
> I mentioned the ResolvedSchema because it is my starting point after the SQL 
> operation. It seemed to me that I can not retrieve something that contains 
> more schema information from the table so I got myself this. About your other 
> answers: It seems the classes you mentioned can be used to serialize actual 
> Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf 
> schema and JSON schema (for Avro as you can see I have it already). It seems 
> odd that this is not easily possible, because converting from a JSON schema 
> to a Schema of Flink is possible using the JsonRowSchemaConverter. However 
> the other way is not implemented it seems. This is how I got a Table Schema 
> (that I can use in a table descriptor) from a JSON schema:
> 
> TypeInformation<Row> type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
> Sidenote: I use deprecated methods here, so if there is a better approach 
> please let me know! But it shows that in Flink its easily possible to create 
> a Schema for a TableDescriptor from a JSON Schema - the other way is just not 
> so trivial it seems. And for Protobuf so far I don’t have any solutions, not 
> even creating a Flink Schema from a Protobuf Schema - not to mention the 
> other way around.
> 
> -Theo
> 
> (resent because I accidentally only responded to you, not the Mailing list - 
> sorry)
> 

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to