Hello, I'm fac
val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(), properties)) val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b => BidderRawLogs(b)).keyBy(b => b.strategyId) val metaStrategy: KeyedStream[(Int, String), Int] = env.readTextFile("path").name("Strategy") .map((1, _) ).keyBy(_._1) val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo() val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo() val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String), (Int, BidderRawLogs, (Int, String))] = new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo, staticTypeInfo) val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]() {}.getTypeInfo() val funName = "test" val joinedStream = bidderStream.connect(metaStrategy) .transform(funName, joinOperator, outTypeInfo)