I am trying to covert Scala code (which works fine) to Java
The sacral code is:
// create a Kafka consumers
// Data
val dataConsumer = new FlinkKafkaConsumer010[Array[Byte]](
  DATA_TOPIC,
  new ByteArraySchema,
  dataKafkaProps
)

// Model
val modelConsumer = new FlinkKafkaConsumer010[Array[Byte]](
  MODELS_TOPIC,
  new ByteArraySchema,
  modelKafkaProps
)

// Create input data streams
val modelsStream = env.addSource(modelConsumer)
val dataStream = env.addSource(dataConsumer)

// Read data from streams
val models = modelsStream.map(ModelToServe.fromByteArray(_))
  .flatMap(BadDataHandler[ModelToServe])
  .keyBy(_.dataType)
val data = dataStream.map(DataRecord.fromByteArray(_))
  .flatMap(BadDataHandler[WineRecord])
  .keyBy(_.dataType)
Now I am trying to re write it to Java and fighting with the requirement of 
providing types, where they should be obvious

// create a Kafka consumers
// Data
FlinkKafkaConsumer010<byte[]> dataConsumer = new FlinkKafkaConsumer010<>(
        ModelServingConfiguration.DATA_TOPIC,
        new ByteArraySchema(),
        dataKafkaProps);

// Model
FlinkKafkaConsumer010<byte[]>  modelConsumer = new FlinkKafkaConsumer010<>(
        ModelServingConfiguration.MODELS_TOPIC,
        new ByteArraySchema(),
        modelKafkaProps);

// Create input data streams
DataStream<byte[]> modelsStream = env.addSource(modelConsumer, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
DataStream<byte[]> dataStream = env.addSource(dataConsumer, 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
// Read data from streams
DataStream<Tuple2<String,ModelToServe>> models = modelsStream
     .flatMap(new ModelConverter(), new 
TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(ModelToServe.class)));

Am I missing something similar to import org.apache.flink.api.scala._
 In java?

Now if this is an only way, Does this seems right?

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

Reply via email to