hi, dear all :

为什么我通过 flatmap 写 liststate,  extends  RichFlatMapFunction, 一直报 
java.lang.NoClassDefFoundError ??


stream.map(x => (x._4._1, x._4._2))
  .keyBy(_._2)
  .flatMap(new ReceptionListStateFunction2)


class ReceptionListStateFunction2 extends RichFlatMapFunction[(String, Int), 
List[String]] {
var myState: ListState[String] = _

override def flatMap(value: (String, Int), out: Collector[List[String]]): Unit 
= {
if (value._2 == 1) {
myState.add(value._1)
    } 

val states = myState.get().iterator()
val listBuf: ListBuffer[String] = new ListBuffer[String]()
while (states.hasNext) {
      listBuf.append(states.next())
    }
    out.collect(listBuf.toList)
  }
}
错误日志:

Exception in thread "main" java.util.concurrent.ExecutionException: 
scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot 
initialize the compiler due to java.lang.NoClassDefFoundError: Could not 
initialize class scala.tools.nsc.Properties$

at 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)

at 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)

at 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)

at 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)

at 
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)

at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)

at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)

at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.<init>(TraversableSerializer.scala:41)









回复