Hey Phillip,

thanks for reporting the problem. I think your assessment is correct. If the 
program is already finished, the threads throwing the Exceptions should have 
been cleaned up as well.

I am not sure, but I think parts of the system touching this mechanism could 
have been reworked by Aljoscha in the current master branch. Is it possible for 
you to try it out? If yes, it would be great to know if it is fixed there. As 
far as I know, there were no API breaking changes in the meantime.

@Aljoscha: do you think this is fixed with your latest changes in 0.10-SNAPSHOT?

– Ufuk

On 28 Jul 2015, at 14:02, Philipp Goetze <philipp.goe...@tu-ilmenau.de> wrote:

> Hey community,
> 
> I am not sure whether it is a bug or I am doing something wrong. I have a 
> little snippet produced by our project (see below). When I execute it in 
> Eclipse everything works fine. However, when deploying the Jar to the local 
> flink installation I get NullPointer Exceptions after the program had already 
> finished. I found out that it happens exactly after the time of the window 
> trigger elapsed (10 seconds in this example). So it seems that there is still 
> a thread running, although the program has already finished. I guess the 
> thread does not get anymore input since the file was completely read already 
> and thus produces NullPointer Exceptions when trying to write these null 
> elements. But I think you know more about this.
> 
> FYI: I am using Flink-0.9.0-rc4 built with Scala 2.11
> 
> So here the code:
> 
> import org.apache.flink.streaming.api.scala._
> import dbis.flink._
> import java.util.concurrent.TimeUnit
> import org.apache.flink.streaming.api.windowing.helper._
> import org.apache.flink.util.Collector
> 
> 
> object windowCount {
> 
>   def customgrpdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = { 
>     out.collect(ts.groupBy(t => t(0)).flatMap(x => List(x._1,x._2)).toList)
>   }
> 
>   def customcntdMap(ts: Iterable[List[Any]], out: Collector[List[Any]]) = { 
>     ts.foreach { t => 
> out.collect(List(t(0),PigFuncs.count(t(1).asInstanceOf[Seq[Any]])))}
>   }
> 
>   def tuplecntdToString(t: List[Any]): String = { 
>     implicit def anyToSeq(a: Any) = a.asInstanceOf[Seq[Any]]
> 
>     val sb = new StringBuilder
>     sb.append(t(0))
>     .append(",")
>     .append(t(1))
>     sb.toString
>   }
> 
>   def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val input = PigStorage().load(env, "src/it/resources/mary.txt")
>     val words = input.flatMap(t => PigFuncs.tokenize(t(0).toString)).map(t => 
> List(t))
> 
>     val win = words.window(Time.of(10, TimeUnit.SECONDS)).every(Time.of(10, 
> TimeUnit.SECONDS))
>     val grpd = win.groupBy(t => t(0)).mapWindow(customgrpdMap _)
>     val cntd = grpd.mapWindow(customcntdMap _).flatten()
> 
>     cntd.map(t => tuplecntdToString(t)).writeAsText("marycounts.out")
>     env.execute("Starting Query")
>   }
> }
> 
> And here the log output:
> 
> Exception in thread "Thread-32" java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
>     at 
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
>     at 
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 7 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 14 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 18 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 21 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:244)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 25 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     ... 28 more
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: 
> java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:277)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     ... 32 more
> Caused by: java.lang.RuntimeException: java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
>     at 
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
>     at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     ... 35 more
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
>     at 
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
>     ... 38 more
> 
> And a part of the .out - File:
> 
> 13:28:48,696 INFO  org.apache.flink.runtime.jobmanager.JobManager             
>    - Received job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query).
> 13:28:48,698 INFO  org.apache.flink.runtime.jobmanager.JobManager             
>    - Scheduling job Starting Query.
> 13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> 
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window 
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> 
> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from 
> CREATED to SCHEDULED
> 13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> 
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window 
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> 
> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from 
> SCHEDULED to DEPLOYING
> 13:28:48,698 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Deploying Read Text File Source -> Map -> Flat Map -> Map -> 
> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer 
> -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window 
> Flatten -> Map -> Stream Sink) (1/1) (attempt #0) to localhost
> 13:28:48,699 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Received task Read Text File Source -> Map -> Flat Map -> Map -> 
> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer 
> -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window 
> Flatten -> Map -> Stream Sink) (1/1)
> 13:28:48,699 INFO  org.apache.flink.runtime.jobmanager.JobManager             
>    - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed 
> to RUNNING.
> 13:28:48,705 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Loading JAR files for task Read Text File Source -> Map -> Flat Map -> 
> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, 
> BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> 
> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> 13:28:48,707 INFO  org.apache.flink.runtime.blob.BlobCache                    
>    - Downloading f2612fe1d4aadc5206820be652dfa1019a66007c from 
> localhost/127.0.0.1:47210
> 13:28:48,709 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Registering task at network: Read Text File Source -> Map -> Flat Map -> 
> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, 
> BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> 
> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1) [DEPLOYING]
> 13:28:48,709 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask        
>    - State backend for state checkpoints is set to jobmanager.
> 13:28:48,759 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> 
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window 
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> 
> Map -> Stream Sink) (1/1) switched to RUNNING
> 13:28:48,759 INFO  org.apache.flink.api.common.io.LocatableInputSplitAssigner 
>    - Assigning remote split to host localhost
> 13:28:48,759 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> 
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window 
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> 
> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from 
> DEPLOYING to RUNNING
> 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> 
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window 
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> 
> Map -> Stream Sink) (1/1) switched to FINISHED
> 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.Task                  
>    - Freeing task resources for Read Text File Source -> Map -> Flat Map -> 
> Map -> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, 
> BasicWindowBuffer -> Window partitioner -> Window Map -> Window Merger -> 
> Window Map -> Window Flatten -> Map -> Stream Sink) (1/1)
> 13:28:48,786 INFO  org.apache.flink.runtime.taskmanager.TaskManager           
>    - Unregistering task and sending final execution state FINISHED to 
> JobManager for task Read Text File Source -> Map -> Flat Map -> Map -> 
> StreamDiscretizer -> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer 
> -> Window partitioner -> Window Map -> Window Merger -> Window Map -> Window 
> Flatten -> Map -> Stream Sink) (ff2bb914c620859de94262af78ac9269)
> 13:28:48,787 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph     
>    - Read Text File Source -> Map -> Flat Map -> Map -> StreamDiscretizer -> 
> (BasicWindowBuffer, BasicWindowBuffer, BasicWindowBuffer -> Window 
> partitioner -> Window Map -> Window Merger -> Window Map -> Window Flatten -> 
> Map -> Stream Sink) (1/1) (ff2bb914c620859de94262af78ac9269) switched from 
> RUNNING to FINISHED
> 13:28:48,787 INFO  org.apache.flink.runtime.jobmanager.JobManager             
>    - Status of job bd1f75cb1db031b11794bf5e1123fd9a (Starting Query) changed 
> to FINISHED.
> 13:28:58,793 ERROR 
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction  - Error while 
> writing element.
> java.lang.NullPointerException
>     at 
> org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
>     at 
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
>     at 
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
>     at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
>     at 
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
>     at 
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> 13:28:58,794 ERROR org.apache.flink.streaming.runtime.tasks.OutputHandler     
>    - Could not forward element to operator.
> java.lang.RuntimeException: java.lang.NullPointerException
>     at 
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:108)
>     at 
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.invoke(FileSinkFunction.java:65)
>     at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$CopyingOperatorCollector.collect(OutputHandler.java:272)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:41)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowFlattener.processElement(WindowFlattener.java:28)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:58)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowMerger.processElement(WindowMerger.java:32)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:35)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:65)
>     at 
> org.apache.flink.streaming.api.operators.windowing.WindowPartitioner.processElement(WindowPartitioner.java:29)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer.emitWindow(BasicWindowBuffer.java:43)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:55)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.handleWindowEvent(StreamWindowBuffer.java:60)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:45)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamWindowBuffer.processElement(StreamWindowBuffer.java:29)
>     at 
> org.apache.flink.streaming.runtime.tasks.OutputHandler$OperatorCollector.collect(OutputHandler.java:239)
>     at 
> org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:40)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.emitWindow(StreamDiscretizer.java:133)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer.triggerOnFakeElement(StreamDiscretizer.java:121)
>     at 
> org.apache.flink.streaming.api.operators.windowing.StreamDiscretizer$WindowingCallback.sendFakeElement(StreamDiscretizer.java:194)
>     at 
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy.activeFakeElementEmission(TimeTriggerPolicy.java:117)
>     at 
> org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy$TimeCheck.run(TimeTriggerPolicy.java:144)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.flink.api.java.io.TextOutputFormat.writeRecord(TextOutputFormat.java:93)
>     at 
> org.apache.flink.streaming.api.functions.sink.FileSinkFunction.flush(FileSinkFunction.java:94)
>     ... 38 more
> [...]
> 
> 
> 
> Best Regards,
> Philipp
> 

Reply via email to