[ https://issues.apache.org/jira/browse/FLINK-8405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321854#comment-16321854 ]
Marek Barak edited comment on FLINK-8405 at 1/11/18 8:28 AM: ------------------------------------------------------------- Thank you [~fhueske] for your answer. I agree that it is a duplicate, and I will close this issue. was (Author: mbarak): Thank you [~fhueske] for your answer. I agree that it is a duplicate, and will close this issue. > Keyed State in broadcasted data steam. > --------------------------------------- > > Key: FLINK-8405 > URL: https://issues.apache.org/jira/browse/FLINK-8405 > Project: Flink > Issue Type: Bug > Reporter: Marek Barak > > Hi guys, > I am trying to join 2 streams. Where the second stream is an codelist used by > the first stream for enrichment. I followed the guide described here: > https://www.safaribooksonline.com/library/view/stream-processing-with/9781491974285/ch04.html > With the distinction that instead of having an local HashMap, i used > MapState. This part is actually important since i want my state properly > checkpointed in cases of a failure. I managed to reproduce the issue with the > following code: > {code} > import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} > import org.apache.flink.configuration.Configuration > import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase > import org.apache.flink.util.Collector > import org.apache.flink.streaming.api.scala._ > import org.junit.{Test, Assert } > class SimpleTest extends StreamingMultipleProgramsTestBase { > val env = StreamExecutionEnvironment.getExecutionEnvironment > case object StateMap extends RichCoFlatMapFunction[String, (String, Int), > Int] { > var codeList: MapState[String,Int] = _ > override def open(parameters: Configuration): Unit = { > codeList = getRuntimeContext.getMapState( > new MapStateDescriptor[String,Int]("test", classOf[String], > classOf[Int]) > ) > } > override def flatMap1(value: String, out: Collector[Int]): Unit = { > val res = if(codeList.contains(value)) codeList.get(value) else 0 > out.collect(res) > } > override def flatMap2(value: (String, Int), out: Collector[Int]): Unit = { > codeList.put(value._1, value._2) > out.close() > } > } > @Test > def job() = { > val inputStream = env.fromCollection(List("Some", "Some2", "Some3")) > val dictStream = env.fromCollection(List("Some" -> 1, "Some2" -> 2, > "Some3" -> 3)) > inputStream > .connect(dictStream.broadcast) > .flatMap(StateMap) > env.execute() > Assert.assertEquals(1, 1) > } > } > {code} > I always get the following issue: > {code} > rg.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NullPointerException: Keyed state can only be used on a > 'keyed stream', i.e., after a 'keyBy()' operation. > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:161) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:153) > at > com.triviadata.sherlog.streaming.job.SimpleTest$StateMap$.open(SimpleTest.scala:23) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:46) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > {code} > I guess the main problem is: > {code} > Caused by: java.lang.NullPointerException: Keyed state can only be used on a > 'keyed stream', i.e., after a 'keyBy()' operation. > {code} > I also tried: > {code} > inputStream > .keyBy(a => a) > .connect(dictStream.broadcast) > .flatMap(StateMap){code] > {code} > But still got the same issue. Either way i think it should work without > calling keyBy on either of the streams, otherwise what would be the reason to > broadcast anything. > FYI: > I am running: > OSX 10.13.1 > Java: Oracle 1.8.0_92 > Scala: 2.11.11 > Fink: 1.3.2, also tried 1.4.0 but got the same problem. -- This message was sent by Atlassian JIRA (v6.4.14#64029)