No, I think that should be all right.

On 06.05.20 16:57, Vishwas Siravara wrote:
Thanks I figured that would be the case. I m using the flink tuple type in
the map functions ,so there is no casting required now. Can you think of
any downsides of using flink tuples in scala code, especially since the
flink tuple is in the java api package in flink ?

Best,
Nick.

On Wed, May 6, 2020 at 9:52 AM Aljoscha Krettek <aljos...@apache.org> wrote:

Hi,

Flink will not do any casting between types. You either need to output
to correct (Scala) Tuple type from the deserialization schema or insert
a step (say a map function) that converts between the two types. The
Tuple2 type and the Scala tuple type, i.e. (foo, bar) have nothing in
common when it comes to the type system.

Best,
Aljoscha

On 06.05.20 01:42, Nick Bendtner wrote:
Hi guys,
In our flink job we use java source for deserializing a message from
kafka
using a kafka deserializer. Signature is as below.


public class CustomAvroDeserializationSchema implements
          KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>>

The other parts of the streaming job are in scala. When data has to be
serialized I get this exception




*java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2
cannot
be cast to scala.Product at

org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at

org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at

org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*

Here is how I provide type info for serialization in the java
deserialization class:

@Override
public TypeInformation<Tuple2<EventMetaData, GenericRecord>>
getProducedType() {
      return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
GenericRecordAvroTypeInfo(this
              .writer));

Here is how I add the kafka source in scala :

private[flink] def sourceType(
    deserialization: KafkaDeserializationSchema[(EventMetaData,
GenericRecord)],
    properties: Properties): FlinkKafkaConsumer[(EventMetaData,
GenericRecord)] = {
    val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
      source.asJava,
      deserialization,
      properties)
    consumer
}

Any idea thoughts on how to interoperate between java tuple2 and scala
case
class ? Also using 1.9.1 version of flink-connector-kafka while the rest
of
the cluster uses 1.7.2 version of flink.

Best,
Nick.





Reply via email to