Thanks guys. Given Flink 1.12 is not ready (e.g. not available in Maven
repo), I need to stick to 1.11.

Dawid,

For the code throwing "java.lang.Long cannot be cast to java.time.Instant",

The avro schema has:
union {null, timestamp_ms } eventTime = null;

The avro pojo does have the logical type conversion:

  private static SpecificData MODEL$ = new SpecificData();
static {
    MODEL$.addLogicalTypeConversion(new
org.apache.avro.data.TimeConversions.TimestampMillisConversion());
  }

I don't see SpecificRecord#getConversions() you mentioned in avro repo.

The pojo code throws:
public void put(int field$, java.lang.Object value$) {
  switch (field$) {
  case 3: eventTime = (java.time.Instant)value$; break; // throw here
  }

I will send the full avdl and pojo offline to you for a close look.


Regards

Lian




On Mon, Sep 21, 2020 at 12:31 AM Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi All,
>
> Avro was finally bumped in
> https://issues.apache.org/jira/browse/FLINK-18192.
>
> The implementers didn't see
> https://issues.apache.org/jira/browse/FLINK-12532, but it is also
> updated now.
>
> Best,
> Aljoscha
>
> On 21.09.20 08:04, Arvid Heise wrote:
> > Hi Lian,
> >
> > we had a similar discussion on [1].
> >
> > TL;DR you are using Avro 1.9.x while Flink still bundles Avro 1.8 [2]
> until
> > Hive bumps it [3]. In the thread, I gave some options to avoid running
> into
> > the issue.
> > The easiest fix is to use Avro 1.8.2 all the way, but you may run into
> [4]
> > if your logical type is nullable (which is not necessary in most cases).
> >
> > Still, I think it's time for us to revise the decision to wait for Hive
> to
> > bump and rather upgrade independently. Avro was for a long time stuck on
> > 1.8 but the project gained traction again in the past two years. On the
> > other hand, Hive seems to be rather slow to respond to that and we
> > shouldn't have a slow moving component block us to support a fast moving
> > component if it's such apparent that users want it.
> > @Aljoscha Krettek <aljos...@apache.org> could you please pick that
> topic up
> > and ping the respective maintainers?
> >
> > [1]
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-from-AVRO-files-td35850.html
> > [2] https://issues.apache.org/jira/browse/FLINK-12532
> > [3] https://issues.apache.org/jira/browse/HIVE-21737
> > [4] https://issues.apache.org/jira/browse/AVRO-1891
> >
> > On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang <jiangok2...@gmail.com>
> wrote:
> >
> >> Thanks Dawid for proposing ConfluentRegistryDeserializationSchema. I am
> >> trying ConfluentRegistryAvroDeserializationSchema (if this is what you
> >> mean) but got "java.lang.Long cannot be cast to java.time.Instant". This
> >> may be caused by https://issues.apache.org/jira/browse/FLINK-11030.
> >> <https://issues.apache.org/jira/browse/FLINK-11030> Is there any
> progress
> >> for this JIRA? Thanks. Regards!
> >>
> >>
> >> Stacktrace:
> >> java.lang.ClassCastException: java.lang.Long cannot be cast to
> >> java.time.Instant
> >> at com.mycompany.mypayload.MetadataRecord.put(MetadataRecord.java:136)
> >> at org.apache.avro.generic.GenericData.setField(GenericData.java:795)
> >> at
> >>
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:139)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
> >> at
> >>
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
> >> at
> >>
> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
> >> at
> >>
> org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> >> at
> >>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
> >> at
> >>
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:74)
> >> at
> >>
> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:89)
> >> at
> >>
> com.mycompany.serde.SpecificRecordSerDe.deserialize(SpecificRecordSerDe.java:16)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:80)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> >>
> >>
> >>
> >> Code:
> >>
> >> import org.apache.avro.specific.SpecificRecord;
> >> import org.apache.flink.api.common.typeinfo.TypeInformation;
> >> import
> org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
> >> import
> org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
> >> import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
> >> import
> org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
> >> import
> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
> >> import org.apache.kafka.clients.consumer.ConsumerRecord;
> >> import org.apache.kafka.clients.producer.ProducerRecord;
> >>
> >> import javax.annotation.Nullable;
> >> import java.io.Serializable;
> >>
> >> public class SpecificRecordSerDe<T extends SpecificRecord> implements
> >>          KafkaSerializationSchema<T>, KafkaContextAware<T>,
> KafkaDeserializationSchema<T>, Serializable {
> >>
> >>      private final Class<T> tClass;
> >>      private String topic; // for serializer
> >>      private String subject; // for serializer
> >>      private final String schemaRegistryUrl;
> >>      private ConfluentRegistryAvroSerializationSchema<T> serializer;
> >>      private ConfluentRegistryAvroDeserializationSchema<T> deserializer;
> >>
> >>      private static final Object lock = new Object();
> >>
> >>      public static <T> SpecificRecordSerDe forDeserializer(final
> Class<T> tClass, String schemaRegistryUrl) {
> >>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl);
> >>      }
> >>
> >>      public static <T> SpecificRecordSerDe forSerializer(final Class<T>
> tClass,
> >>                                                          String
> schemaRegistryUrl,
> >>                                                          final String
> topic,
> >>                                                          final String
> subject) {
> >>          return new SpecificRecordSerDe(tClass, schemaRegistryUrl,
> topic, subject);
> >>      }
> >>
> >>      private SpecificRecordSerDe(final Class<T> tClass, String
> schemaRegistryUrl) {
> >>          this.tClass = tClass;
> >>          this.schemaRegistryUrl = schemaRegistryUrl;
> >>      }
> >>
> >>      private SpecificRecordSerDe(final Class<T> tClass,
> >>                                  final String schemaRegistryUrl,
> >>                                  final String topic,
> >>                                  final String subject) {
> >>          this(tClass, schemaRegistryUrl);
> >>          this.topic = topic;
> >>          this.subject = subject;
> >>      }
> >>
> >>      @Override
> >>      public ProducerRecord<byte[], byte[]> serialize(T element,
> @Nullable Long timestamp) {
> >>          if (this.serializer == null) {
> >>              synchronized (lock) {
> >>                  if (this.serializer == null) {
> >>                      this.serializer =
> ConfluentRegistryAvroSerializationSchema
> >>                              .forSpecific(tClass, this.subject,
> this.schemaRegistryUrl);
> >>                  }
> >>              }
> >>          }
> >>
> >>          byte[] bytes = this.serializer.serialize(element);
> >>          return new ProducerRecord<>(this.topic, bytes);
> >>      }
> >>
> >>      public boolean isEndOfStream(T nextElement) {
> >>          return false;
> >>      }
> >>
> >>      @Override
> >>      public T deserialize(ConsumerRecord<byte[], byte[]> record) throws
> Exception {
> >>          if (deserializer == null) {
> >>              synchronized (lock) {
> >>                  if (deserializer == null) {
> >>                      deserializer =
> ConfluentRegistryAvroDeserializationSchema
> >>                              .forSpecific(tClass,
> this.schemaRegistryUrl);
> >>                  }
> >>              }
> >>          }
> >>
> >>          return deserializer.deserialize(record.value());
> >>      }
> >>
> >>      @Override
> >>      public String getTargetTopic(T element) {
> >>          return this.topic;
> >>      }
> >>
> >>      @Override
> >>      public TypeInformation<T> getProducedType() {
> >>          return TypeInformation.of(tClass);
> >>      }
> >> }
> >>
> >>
> >>
> >>
> >> On Thu, Sep 17, 2020 at 9:42 AM Dawid Wysakowicz <
> dwysakow...@apache.org>
> >> wrote:
> >>
> >>> Thanks for the update.
> >>>
> >>> First of all, why did you decide to build your own
> DeserializationSchema
> >>> instead of using ConfluentRegistryDeserializationSchema? Your
> >>> implementation is quite inefficient you do deserialize > serialize >
> >>> deserialize. Serialization/deserialization is usually one of the
> heaviest
> >>> operations in the pipeline.
> >>>
> >>> What do you return in your getProducedType? From the stack trace I
> guess
> >>> you are instantiating the AvroTypeInfo? Could you maybe share a full
> >>> runnable example? It would make it much easier to help you.
> >>>
> >>> Moreover the pattern with registering custom conversions in a
> >>> SpecificData will not work with AvroSerializer. Custom serializers
> should
> >>> be defined in the generated SpecificRecord (in your case
> PayloadRecord) in
> >>> the SpecificRecordBase#getConversion().
> >>>
> >>> Best,
> >>>
> >>> Dawid
> >>>
> >>>
> >>> On 17/09/2020 16:34, Lian Jiang wrote:
> >>>
> >>> Piotr/Dawid,
> >>>
> >>> Thanks for the reply. FLINK-18223 seems not to related to this issue
> and
> >>> I double checked that I am using Flink 1.11.0 instead of 1.10.0. My
> >>> mistake. StreamExecutionEnvironment#getConfig()#enableObjectReuse())
> solved
> >>> the issue.
> >>>
> >>> I am not using ConfluentRegistryDeserializationSchema. Instead, I am
> >>> creating custom DeserializationSchema:
> >>>
> >>>
> >>> /*
> >>> the deser class
> >>> */
> >>> public class SpecificRecordSerDe<T extends SpecificRecord> implements
> >>>
> >>>          KafkaSerializationSchema<T>, KafkaContextAware<T>,
> KafkaDeserializationSchema<T>, Serializable {
> >>> private final Class<T> tClass;private final String tSchemaStr;private
> volatile transient Schema tSchema;private String topic;private String
> schemaRegistryUrl;private KafkaAvroSerializer serializer;private
> KafkaAvroDecoder decoder;
> >>> public SpecificRecordSerDe(final Class<T> tClass, String tSchemaStr,
> String schemaRegistryUrl) {
> >>>      this.tClass = tClass;
> >>>      this.tSchemaStr = tSchemaStr;
> >>>      this.topic = null;
> >>>      this.schemaRegistryUrl = schemaRegistryUrl;
> >>> }
> >>>
> >>> @Overridepublic T deserialize(ConsumerRecord<byte[], byte[]> record)
> throws Exception {    CachedSchemaRegistryClient client = new
> CachedSchemaRegistryClient(
> >>>          schemaRegistryUrl,
> >>>          4);
> >>>      decoder = new KafkaAvroDecoder(client);
> >>>      GenericRecord generic = (GenericRecord)
> decoder.fromBytes(record.value());    DatumWriter<GenericRecord> writer =
> new SpecificDatumWriter<>(generic.getSchema(),
> ManagedSpecificData.getForClass(tClass));
> >>>      ByteArrayOutputStream out = new ByteArrayOutputStream();
> >>>      Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
> >>>      writer.write(generic, encoder);
> >>>      encoder.flush();
> >>>
> >>>      byte[] avroData = out.toByteArray();
> >>>      out.close();
> >>>      tSchema = new org.apache.avro.Schema.Parser().parse(tSchemaStr);
> >>>      SpecificDatumReader<T> reader = new SpecificDatumReader<>(
> >>>              generic.getSchema(), tSchema,
> ManagedSpecificData.getForClass(tClass));
> >>>      Decoder anotherDecoder =
> DecoderFactory.get().binaryDecoder(avroData, null);
> >>>      T res = reader.read(null, anotherDecoder);
> >>>
> >>>      return res;
> >>> }
> >>> }
> >>>
> >>>
> >>> /*
> >>> the specificData class
> >>> */public class ManagedSpecificData extends SpecificData {     private
> static ManagedSpecificData getManagedSpecificData() {
> >>>      ManagedSpecificData res = new ManagedSpecificData();
> >>>
> >>>      registerAdvancedType(new TimestampMillisType(), res);
> >>>      registerAdvancedType(new LocalDateType(), res);
> >>>
> >>>      return res;
> >>> }}
> >>>
> >>>
> >>> /*
> >>>
> >>> how we use above deser class
> >>> */
> >>>
> >>> SpecificRecordSerDe<PayloadRecord> deserializer = new
> SpecificRecordSerDe<>(
> >>>          PayloadRecord.class,
> >>>          PayloadRecord.getClassSchema().toString(),
> >>>          this.schemaRegistry);
> >>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(
> >>>          this.inputTopic,
> >>>          deserializer,
> >>>          this.sourceSettings);
> >>>
> >>>
> >>>
> >>> Thanks
> >>>
> >>> Lian
> >>>
> >>>
> >>>
> >>>
> >>> On Thu, Sep 17, 2020 at 2:19 AM Dawid Wysakowicz <
> dwysakow...@apache.org>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> Could you share exactly how do you configure avro & kafka? Do you use
> >>>> Table API or DataStream API? Do you use the
> >>>> ConfluentRegistryDeserializationSchema that comes with Flink or did
> you
> >>>> built custom DeserializationSchema? Could you maybe share the code for
> >>>> instantiating the source with us? It could help us track down the
> >>>> problematic spot.
> >>>>
> >>>> Best,
> >>>>
> >>>> Dawid
> >>>>
> >>>> On 16/09/2020 08:09, Lian Jiang wrote:
> >>>>> Hi,
> >>>>>
> >>>>> i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In
> >>>>> Intellij, I can see the FlinkKafkaConsumer already deserialized the
> >>>>> upstream kafka message. However, I got below error when this message
> >>>>> is serialized during pushToOperator. Per the stack trace, the reason
> >>>>> is that AvroSerializer is created by AvroFactory.fromSpecific() which
> >>>>> creates its private copy of specificData. This private specificData
> >>>>> does not have logical type information. This blocks the deserialized
> >>>>> messages from being passed to downstream operators. Any idea how to
> >>>>> make this work? Appreciated very much!
> >>>>>
> >>>>>
> >>>>> org.apache.avro.AvroRuntimeException: Unknown datum type
> >>>>> java.time.Instant: 2020-09-15T07:00:00Z
> >>>>> at
> >>>>
> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:887)
> >>>>> at
> >>>>>
> >>>>
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> >>>>> at
> >>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:850)
> >>>>> at
> >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> >>>>> at
> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> >>>>> at
> >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> >>>>> at
> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> >>>>> at
> >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1280)
> >>>>> at
> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> >>>>> at
> >>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1261)
> >>>>> at
> org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1199)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> >>>>> at
> >>>>>
> >>>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> >>>>
> >>>>
> >>>
> >>> --
> >>>
> >>>
> >>>
> >>> Create your own email signature
> >>> <
> https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592
> >
> >>>
> >>>
> >>
> >> --
> >>
> >> Create your own email signature
> >> <
> https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592
> >
> >>
> >
> >
>
>

-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>

Reply via email to