Thanks Gouwei,

setting format.setReuseAvroValue(false) with 1.8.2-generated records does
not solve the problem.

12:02:59,314 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding
checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:170)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)


----------------------------

Summarising, the only working combination seems to be:

   - Use AVRO 1.9.2 code generation, setting dateTimeLogicalTypeImplementation
   = joda
   - Enabling Object Reuse (being careful for the implications)

Using AVRO 1.8.2 code generation does not work, with any of the other
workarounds.
Using Generic objects does not work for a bug in AvroSerializer
<https://issues.apache.org/jira/browse/FLINK-18223> but GenericRecords also
brings a number of other problems.

I am not very comfortable with using AVRO objects generated with a
different AVRO version than the one supported by Flink.
I am going to map AVRO records into hand-written POJOs immediately after
the ingestion to reduce chances of further issues. I reckon this is very
empirical, but that's what the workaround looks to me :)

Lorenzo

P.S, I want to give a massive thank to this community. So far it has been
one of the most reactive and helpful I ever interacted with.

On Thu, 11 Jun 2020 at 10:25, Guowei Ma <guowei....@gmail.com> wrote:

> Hi,
> for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);
>
> Best,
> Guowei
>
>
> Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2020年6月11日周四 下午5:02写道:
>
>> Hi Arvid,
>>
>> thanks for the point about catching records. Gotcha!
>>
>> Sorry I cannot share the full schema or generated code. It's a 3rd party
>> IP and we signed a meter-think NDA... I think I can post snippets.
>> The schema is heavily nested, including arrays of other record types
>> Types are primitives, or logical decimal and timestamp-millis. No union.
>>
>> #conversion is in AccountEntries only (one of the nested records) and
>> looks like this:
>>
>> private static final org.apache.avro.Conversion<?>[] conversions =
>>     new org.apache.avro.Conversion<?>[] {
>>     null,
>>     null,
>>     null,
>>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>     new org.apache.avro.data.JodaTimeConversions.TimestampConversion(),
>>     null,
>>     null,
>>     null,
>>     null,
>>     null,
>>     null,
>>     null
>> };
>>
>>
>> Note that I have to generate the specific object with AVRO 1.9.2 Maven
>> Plugin.
>> With 1.8.2 generated code it fails with the following exception,
>> regardless setting enableObjectReuse()
>>
>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>> org.joda.time.DateTime
>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>> at
>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>
>>
>> Thanks for the help
>> Lorenzo
>>
>>
>> On Thu, 11 Jun 2020 at 08:58, Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Lorenzo,
>>>
>>> I'm glad that it worked out somehow, but I'd still like to understand
>>> what went wrong, so it will work more smoothly for future users. I double
>>> checked and we even test AvroSerializer with logical types, so I'm a bit
>>> puzzled.
>>>
>>> Could you attach GlHeader or at least show us how GlHeader#conversions look
>>> like? I want to exclude the possibility that the source generator screwed
>>> up.
>>>
>>> Concerning object reuse is that you need to treat all POJO as immutable
>>> (I'm assuming that that's what your meant from your description), but you
>>> should also never cache values like
>>> class ShiftElements extends MapFunction {
>>>   Object lastElement;
>>>
>>>   Object map(Object newElement, Collector out) {
>>>     out.collect(lastElement);
>>>     lastElement = newElement; // <- never cache with enableObjectReuse
>>>   }
>>> }
>>>
>>> (excuse my ugly code)
>>>
>>> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora <lorenzo.nic...@gmail.com>
>>> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> answering to your other questions
>>>>
>>>> Here is the stacktrace of the case (1),  when I try to read using
>>>> specific records generated by the AVRO 1.8.2 plugin
>>>>
>>>> java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>> org.joda.time.DateTime
>>>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125)
>>>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690)
>>>> at
>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> at
>>>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>>>> at
>>>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
>>>> at
>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323)
>>>>
>>>>
>>>> I also tried generating the specific object with avro 1.9.2 (2)  but
>>>> forcing it to use Joda time but still didn't work
>>>>
>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>> Could not forward element to next operator
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>>> at
>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z
>>>> at
>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>>> at
>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>> at
>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>> at
>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>> at
>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>>> ... 7 more
>>>>
>>>>
>>>> But in the second case, it seems the failure happens when Flink tries
>>>> to make a copy of the record.
>>>> So I followed your suggestion of enableObjectReuse() and* IT WORKS!*
>>>>
>>>> I am not sure I understand all implications of object reuse in Flink,
>>>> specifically.
>>>> I am familiar with the general risk of mutable messages, and I always
>>>> handle them as mutable even when they are POJO. Never mutating and
>>>> forwarding the same record.
>>>> Not sure whether there are other implications in Flink.
>>>>
>>>> Many thanks
>>>> Lorenzo
>>>>
>>>>
>>>> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com> wrote:
>>>>
>>>>> Hi Lorenzo,
>>>>>
>>>>> 1) I'm surprised that this doesn't work. I'd like to see that
>>>>> stacktrace.
>>>>>
>>>>> 2) cannot work like this, because we bundle Avro 1.8.2. You could
>>>>> retest with dateTimeLogicalType='Joda' set, but then you will
>>>>> probably see the same issue as 1)
>>>>>
>>>>> 3) I'm surprised that this doesn't work either. There is a codepath
>>>>> since 2016 for GenericRecord and it's covered in a test. From the error
>>>>> description and the ticket, it looks like the issue is not the
>>>>> AvroInputFormat, but the serializer. So it would probably work with a
>>>>> different serializer (but that would cause back and forth type
>>>>> transformation).
>>>>>
>>>>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <
>>>>> lorenzo.nic...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Timo,
>>>>>>
>>>>>> the stacktrace with 1.9.2-generated specific file is the following
>>>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>>>>> Could not forward element to next operator
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>>> java.time.Instant: 2020-06-01T02:00:42.105Z
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>>>>> at
>>>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>> at
>>>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>>>>> at
>>>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>>>>> ... 7 more
>>>>>>
>>>>>>
>>>>>> I reckon logical types might have been considered somehow
>>>>>> experimental since...ever. But, honestly, I've been using them in the
>>>>>> Kafka/Java ecosystem as well as in Spark without too many problems.
>>>>>>
>>>>>> For my specific use case, the schema is given. Messages are produced
>>>>>> by a 3rd party and we cannot change the schema (especially because it's a
>>>>>> legit schema).
>>>>>> I am desperately looking for a workaround.
>>>>>>
>>>>>> I  had a similar issue with a Kafka Source, and AVRO records
>>>>>> containing decimals and timestamps. Timestamps worked but not decimals.
>>>>>> I was able to work around the problem using GenericRecords.
>>>>>> But Kafka source relies on AvroDeserializationSchema rather than
>>>>>> AvroSerializer, and has no problem handling GenericRecords.
>>>>>>
>>>>>> I'm honestly finding very confusing having different ways of handling
>>>>>> AVRO deserialization inside Flink core components.
>>>>>>
>>>>>> Cheers
>>>>>> Lorenzo
>>>>>>
>>>>>>
>>>>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Lorenzo,
>>>>>>>
>>>>>>> as far as I know we don't support Avro's logical times in Flink's
>>>>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports
>>>>>>> the
>>>>>>> 1.8.2 version of logical types but might be incompatible with 1.9.2.
>>>>>>>
>>>>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin:
>>>>>>>
>>>>>>> Could you send us the full stack trace? I think this should actually
>>>>>>> work, because specific records are handled as POJOs and those should
>>>>>>> be
>>>>>>> able to also deal with logical type's classes through Kryo.
>>>>>>>
>>>>>>> Reg 3) Generic record
>>>>>>>
>>>>>>> It would be great if we can make this option possible. We could
>>>>>>> include
>>>>>>> it in the next minor release fix.
>>>>>>>
>>>>>>> Sorry, for the bad user experience. But IMHO logical type are still
>>>>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest
>>>>>>> shortcomings such that Flink can properly support them as well.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
>>>>>>>
>>>>>>>
>>>>>>> On 10.06.20 15:08, Lorenzo Nicora wrote:
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > I need to continuously ingest AVRO files as they arrive.
>>>>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the
>>>>>>> point
>>>>>>> > here. I started trying to ingest a static bunch of files from
>>>>>>> local fs
>>>>>>> > first and I am having weird issues with AVRO deserialization.
>>>>>>> >
>>>>>>> > I have to say, the records contain logical types, timestamps-ms
>>>>>>> and decimals
>>>>>>> >
>>>>>>> > To keep it simple, I extracted the AVRO schema from the data files
>>>>>>> and
>>>>>>> > used avro-maven-plugin to generate POJOs
>>>>>>> > I tried multiple combinations, all with no luck
>>>>>>> >
>>>>>>> > 1) Specific record generated with AVRO 1.8.2 plugin
>>>>>>> >
>>>>>>> > Path in = new Path(sourceBasePath);
>>>>>>> > AvroInputFormat<AccountEntries> inputFormat = new
>>>>>>> AvroInputFormat<>(in,
>>>>>>> > AccountEntries.class);
>>>>>>> > DataStream<AccountEntries> accountEntries = env
>>>>>>> > .readFile(inputFormat, sourceBasePath,
>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);
>>>>>>> >
>>>>>>> > *Result*
>>>>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>>>>> > org.joda.time.DateTime
>>>>>>> > (IIRC this is a known AVRO 1.8.2 issue)
>>>>>>> >
>>>>>>> >
>>>>>>> > 2) Specific record generated with AVRO 1.9.2 plugin
>>>>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2
>>>>>>> >
>>>>>>> > *Result*
>>>>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type
>>>>>>> java.time.Instant
>>>>>>> >
>>>>>>> >
>>>>>>> > 3) Generic record
>>>>>>> > I am getting the Schema from the generated specific record, for
>>>>>>> > convenience, but I am not using the generated POJO as record.
>>>>>>> > I also followed the suggestions in this Flink blog post
>>>>>>> > <
>>>>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>,
>>>>>>>
>>>>>>> > to explicitly specify the TypeInfo with returns(...)
>>>>>>> >
>>>>>>> > Path in = new Path(config.sourceFileSystemPath);
>>>>>>> > Schema schema = AccountEntries.getClassSchema();
>>>>>>> > AvroInputFormat<GenericRecord> inputFormat = new
>>>>>>> AvroInputFormat<>(in,
>>>>>>> > GenericRecord.class);
>>>>>>> > DataStream<GenericRecord> accountEntries = env
>>>>>>> > .readFile(inputFormat, config.sourceFileSystemPath,
>>>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
>>>>>>> > .returns(new GenericRecordAvroTypeInfo(schema));
>>>>>>> >
>>>>>>> >
>>>>>>> > *Result*
>>>>>>> > The class 'org.apache.avro.generic.GenericRecord' is not
>>>>>>> instantiable:
>>>>>>> > The class is not a proper class. It is either abstract, an
>>>>>>> interface, or
>>>>>>> > a primitive type.
>>>>>>> >
>>>>>>> > This looks like a bug.
>>>>>>> > I raised the ticket <
>>>>>>> https://issues.apache.org/jira/browse/FLINK-18223>
>>>>>>> > and I will try to submit a fix, but still do not solve my problem
>>>>>>> as I
>>>>>>> > am using a managed Flink I cannot update.
>>>>>>> > I cannot believe there is no workaround. I do not think I'm trying
>>>>>>> to do
>>>>>>> > anything bizarre. Am I?
>>>>>>> >
>>>>>>> > Any ideas?
>>>>>>> > Am I missing something obvious?
>>>>>>> >
>>>>>>> > Cheers
>>>>>>> > Lorenzo
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>

Reply via email to