Hello,

I'm fac

val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
BidderRawLogs(b)).keyBy(b => b.strategyId)

val metaStrategy: KeyedStream[(Int, String), Int] =
env.readTextFile("path").name("Strategy")
 .map((1, _) ).keyBy(_._1)

val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
(Int, BidderRawLogs, (Int, String))] =
 new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
staticTypeInfo)
val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
{}.getTypeInfo()

val funName = "test"
val joinedStream = bidderStream.connect(metaStrategy)
.transform(funName, joinOperator, outTypeInfo)

Reply via email to