Hey community,
I am not sure whether it is a bug or I am doing something wrong. I have
a little snippet produced by our project (see below). When I execute it
in Eclipse everything works fine. However, when deploying the Jar to the
local flink installation I get NullPointer Exceptions after the program
had already finished. I found out that it happens exactly after the time
of the window trigger elapsed (10 seconds in this example). So it seems
that there is still a thread running, although the program has already
finished. I guess the thread does not get anymore input since the file
was completely read already and thus produces NullPointer Exceptions
when trying to write these null elements. But I think you know more
about this.
FYI: I am using Flink-0.9.0-rc4 built with Scala 2.11
So here the code:
import org.apache.flink.streaming.api.scala._
import dbis.flink._
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.windowing.helper._
import org.apache.flink.util.Collector
object windowCount {
def customgrpdMap(ts: Iterable[List[Any]], out:
Collector[List[Any]]) = {
out.collect(ts.groupBy(t => t(0)).flatMap(x =>
List(x._1,x._2)).toList)
}
def customcntdMap(ts: Iterable[List[Any]], out:
Collector[List[Any]]) = {
ts.foreach { t =>
out.collect(List(t(0),PigFuncs.count(t(1).asInstanceOf[Seq[Any]])))}
}
def tuplecntdToString(t: List[Any]): String = {
implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
val sb = new StringBuilder
sb.append(t(0))
.append(",")
.append(t(1))
sb.toString
}
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input = PigStorage().load(env, "src/it/resources/mary.txt")
val words = input.flatMap(t =>
PigFuncs.tokenize(t(0).toString)).map(t => List(t))
val win = words.window(Time.of(10,
TimeUnit.SECONDS)).every(Time.of(10, TimeUnit.SECONDS))
val grpd = win.groupBy(t => t(0)).mapWindow(customgrpdMap _)
val cntd = grpd.mapWindow(customcntdMap _).flatten()
cntd.map(t => tuplecntdToString(t)).writeAsText("marycounts.out")
env.execute("Starting Query")
}
}
And here the log output:
Exception in thread "Thread-32" java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
at
org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
at
org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
at
org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
at
org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
... 7 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
at
org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
... 14 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
... 18 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
at
org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
... 21 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
... 25 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
at
org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
... 28 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException:
java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
... 32 more
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
at
org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
... 35 more
Caused by: java.lang.NullPointerException
at
org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
at
org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
... 38 more
And a part of the .out - File:
13:28:48,696 INFO
org.apache.flink.runtime.jobmanager.JobManager -
Received job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query).
13:28:48,698 INFO
org.apache.flink.runtime.jobmanager.JobManager -
Scheduling job Starting Query.
13:28:48,698 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Read
Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
(BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
partitioner -> Window Map -> Window Merger -> Window Map -> Window
Flatten -> Map -> Stream Sink) (1/1)
(ff2bb914c620859de94262af78ac9269) switched from CREATED to SCHEDULED
13:28:48,698 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Read
Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
(BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
partitioner -> Window Map -> Window Merger -> Window Map -> Window
Flatten -> Map -> Stream Sink) (1/1)
(ff2bb914c620859de94262af78ac9269) switched from SCHEDULED to DEPLOYING
13:28:48,698 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
Deploying Read Text File Source -> Map -> Flat Map -> Map ->
StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer,
BasicWindowBuffer -> Window partitioner -> Window Map -> Window
Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
(attempt #0) to localhost
13:28:48,699 INFO
org.apache.flink.runtime.taskmanager.TaskManager -
Received task Read Text File Source -> Map -> Flat Map -> Map ->
StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer,
BasicWindowBuffer -> Window partitioner -> Window Map -> Window
Merger -> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
13:28:48,699 INFO
org.apache.flink.runtime.jobmanager.JobManager -
Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query)
changed to RUNNING.
13:28:48,705 INFO
org.apache.flink.runtime.taskmanager.Task -
Loading JAR files for task Read Text File Source -> Map -> Flat Map
-> Map -> StreamDiscretizer -> (BasicWindowBuffer,
BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window
Map -> Window Merger -> Window Map -> Window Flatten -> Map ->
Stream Sink) (1/1)
13:28:48,707 INFO
org.apache.flink.runtime.blob.BlobCache -
Downloading f2612fe1d4aadc5206820be652dfa1019a66007c from
localhost/127.0.0.1:47210
13:28:48,709 INFO
org.apache.flink.runtime.taskmanager.Task -
Registering task at network: Read Text File Source -> Map -> Flat
Map -> Map -> StreamDiscretizer -> (BasicWindowBuffer,
BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window
Map -> Window Merger -> Window Map -> Window Flatten -> Map ->
Stream Sink) (1/1) [DEPLOYING]
13:28:48,709 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask -
State backend for state checkpoints is set to jobmanager.
13:28:48,759 INFO
org.apache.flink.runtime.taskmanager.Task - Read
Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
(BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
partitioner -> Window Map -> Window Merger -> Window Map -> Window
Flatten -> Map -> Stream Sink) (1/1) switched to RUNNING
13:28:48,759 INFO
org.apache.flink.api.common.io.LocatableInputSplitAssigner -
Assigning remote split to host localhost
13:28:48,759 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Read
Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
(BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
partitioner -> Window Map -> Window Merger -> Window Map -> Window
Flatten -> Map -> Stream Sink) (1/1)
(ff2bb914c620859de94262af78ac9269) switched from DEPLOYING to RUNNING
13:28:48,786 INFO
org.apache.flink.runtime.taskmanager.Task - Read
Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
(BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
partitioner -> Window Map -> Window Merger -> Window Map -> Window
Flatten -> Map -> Stream Sink) (1/1) switched to FINISHED
13:28:48,786 INFO
org.apache.flink.runtime.taskmanager.Task -
Freeing task resources for Read Text File Source -> Map -> Flat Map
-> Map -> StreamDiscretizer -> (BasicWindowBuffer,
BasicWindowBuffer, BasicWindowBuffer -> Window partitioner -> Window
Map -> Window Merger -> Window Map -> Window Flatten -> Map ->
Stream Sink) (1/1)
13:28:48,786 INFO
org.apache.flink.runtime.taskmanager.TaskManager -
Unregistering task and sending final execution state FINISHED to
JobManager for task Read Text File Source -> Map -> Flat Map -> Map
-> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer,
BasicWindowBuffer -> Window partitioner -> Window Map -> Window
Merger -> Window Map -> Window Flatten -> Map -> Stream Sink)
(ff2bb914c620859de94262af78ac9269)
13:28:48,787 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Read
Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer ->
(BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window
partitioner -> Window Map -> Window Merger -> Window Map -> Window
Flatten -> Map -> Stream Sink) (1/1)
(ff2bb914c620859de94262af78ac9269) switched from RUNNING to FINISHED
13:28:48,787 INFO
org.apache.flink.runtime.jobmanager.JobManager -
Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query)
changed to FINISHED.
13:28:58,793 ERROR
org.apache.flink.streaming.api.functions.sink.FileSinkFunction -
Error while writing element.
java.lang.NullPointerException
at
org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
at
org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
at
org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
at
org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
at
org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
at
org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
at
org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
at
org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
at
org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
at
org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
at java.lang.Thread.run(Thread.java:745)
13:28:58,794 ERROR
org.apache.flink.streaming.runtime.tasks.OutputHandler -
Could not forward element to operator.
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
at
org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
at
org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
at
org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
at
org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
at
org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
at
org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
at
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
at
org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
at
org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
at
org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
at
org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
at
org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
at
org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
... 38 more
[...]
Best Regards,
Philipp