This is correct. Since you didn't specify any windowing, data will only be written at the end of the global window - which will never be reached.
If you want to stay in the GlobalWindow, you can simply specify a trigger to determine how often to write the files. On Tue, Jun 6, 2017 at 1:04 AM, Jean-Baptiste Onofré <[email protected]> wrote: > Yes, I think because there's no trigger. You could at a DoFn (with time) > to check the watermark. I think the watermark is not yet reach, so the > trigger doesn't happen. > > So, I don't think the problem is on HDFS, it's because, on the > window/trigger. > > Regards > JB > > On 06/06/2017 10:00 AM, Vikas Gupta wrote: > >> >> >> On 2017-06-06 13:03 (+0530), Jean-Baptiste Onofré <[email protected]> >> wrote: >> >>> Hi Vikas, >>> >>> I think the problem is the window/triggering you are using as you are >>> consuming >>> from an unbounded source (the kafka topic). >>> >>> Do you use a FixedWindow ? >>> >>> Maybe, just to test if it works, you can try something like: >>> >>> .apply(Window.<String>into(FixedWindows.of(Duration.standard >>> Seconds(10))) >>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst >>> ElementInPane().plusDelayOf(Duration.standardSeconds(10)))) >>> .withAllowedLateness(Duration.ZERO) >>> .discardingFiredPanes() >>> ) >>> >>> just to see if HDFS is working. >>> >>> I guess, you are also using HADOOP_CONF_DIR (or similar) to define the >>> location >>> of the hdfs-site.xml ? >>> >>> Regards >>> JB >>> >>> On 06/06/2017 09:27 AM, Vikas Gupta wrote: >>> >>>> We are facing an issue in executing the beam streaming pipeline as >>>> sparkRunner. This simple word count pipeline listens to kafka topic as >>>> source process the word counts and write to HDFS. Pipeline is evaluated >>>> till processing which we had verified by adding the log statement on spark >>>> driver executor. >>>> >>>> Here is the code to write to HDFS >>>> options.setNumShards(10); >>>> wordCounts.apply(MapElements.via(new >>>> WordCount.FormatAsTextFn())) >>>> .apply("WriteCounts", >>>> >>>> TextIO.write().to(output).withWindowedWrites() >>>> >>>> .withFilenamePolicy(new PerWindowFiles("out1")) >>>> >>>> .withNumShards(1)); >>>> >>>> Also tried below approach >>>> PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new >>>> WordCount.CountWords()); >>>> wordCounts >>>> .apply(MapElements.via(new WordCount.FormatAsTextFn())) >>>> .apply(new WriteOneFilePerWindow(output, >>>> options.getNumShards())); >>>> >>>> None of these approaches worked. >>>> >>>> We tried another pipeline which read the data from HDFS test file, >>>> count the words and write to HDFS as source. This worked fine. >>>> >>>> We are using cloudera sandbox 5.10. >>>> >>>> >>> -- >>> Jean-Baptiste Onofré >>> [email protected] >>> http://blog.nanthrax.net >>> Talend - http://www.talend.com >>> >>> >> We checked earlier HDFS working fine because we are able to run the beam >> pipeline to count the words where input is in HDFS and output is also >> stored in HDFS. >> >> Problem which we are facing is that messages are read from kafka, it is >> processed we saw it in executor logs. It is just output is not getting >> saved in HDFS for streams. >> >> I modified the code as suggested but still the same problem. Windowed >> output is not saved into HDFS. >> >> Thanks >> Vikas Gupta >> >> > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
