This seems to happen because of TextIO.Read creates value in an empty window, while our unit tests use Create.of which creates values in a global window.I fixed it (I'll open a JIRA and PR tomorrow, and also add a test that reads from file) for Combine.PerKey but when I tried removing support for Combine.PerKey and let it run with GroupByKey followed by Combine.GroupedValues it uses GroupAlsoByWindowsViaOutputBufferDoFn (no direct spark translation for this one) which returns an empty PCollection. Any ideas ?
On Tue, Apr 12, 2016 at 2:25 PM Amit Sela <[email protected]> wrote: > Yep. It seems like you're right. Let me dig into that and get back to you > with an answer. > This was definitely working after the code drop and README re-write (I > tested both mvn exec:exec and running on cluster). > > Thanks, > Amit > > On Tue, Apr 12, 2016 at 1:52 PM Jean-Baptiste Onofré <[email protected]> > wrote: > >> Hi, >> >> thanks for the update. >> >> Did you try -Doutput=~/test/beam/output (or is it a copy/paste mistake) ? >> >> Let me try to reproduce and check the pipeline definition. >> >> Thanks, >> Regards >> JB >> >> On 04/12/2016 12:41 PM, Jianfeng Qian wrote: >> > Hi, >> > >> > I just start to use Beam. >> > >> > I had installed Scala 2.11 , Hadoop 2.72, Spark 1.6.1 and started hadoop >> > and spark. >> > >> > I downloaded and build the Beam. >> > >> > I downloaded the filed by: >> > >> > /curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > >> > /home/jeff/test/beam/input/kinglear.txt/ >> > >> > /jeff@T:~/test/beam$ ls -l input >> > total 184 >> > -rw-rw-r-- 1 jeff jeff 185965 Apr 11 17:16 kinglear.txt/ >> > >> > *then I run the local mode:* >> > >> > /jeff@T:~/git/incubator-beam/runners/spark$ mvn exec:exec >> > -DmainClass=com.google.cloud.dataflow.examples.WordCount \ >> > > -Dinput=~/test/beam/input/kinglear.txt -Doutput=/test/beam/output >> > -Drunner=SparkPipelineRunner \ >> > > -DsparkMaster=local/ >> > *However, the result file is empty. Is anyone faced with the same >> problem?* >> > >> > /jeff@T:~/test/beam$ ls -l >> > total 4 >> > drwxrwxr-x 2 jeff jeff 4096 Apr 12 17:32 input >> > -rw-r--r-- 1 jeff jeff 0 Apr 12 18:20 output-00000-of-00001 >> > -rw-r--r-- 1 jeff jeff 0 Apr 12 18:20 _SUCCESS/ >> > >> > >> > local mode log is as following: >> > >> > [INFO] Scanning for projects... >> > [WARNING] The POM for org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 is >> > missing, no dependency information available >> > [WARNING] Failed to retrieve plugin descriptor for >> > org.eclipse.m2e:lifecycle-mapping:1.0.0: Plugin >> > org.eclipse.m2e:lifecycle-mapping:1.0.0 or one of its dependencies could >> > not be resolved: Failure to find >> > org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 in >> > https://repo.maven.apache.org/maven2 was cached in the local >> repository, >> > resolution will not be reattempted until the update interval of central >> > has elapsed or updates are forced >> > [INFO] >> > [INFO] >> > ------------------------------------------------------------------------ >> > [INFO] Building Apache Beam :: Runners :: Spark >> 0.1.0-incubating-SNAPSHOT >> > [INFO] >> > ------------------------------------------------------------------------ >> > [WARNING] The POM for org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 is >> > missing, no dependency information available >> > [WARNING] Failed to retrieve plugin descriptor for >> > org.eclipse.m2e:lifecycle-mapping:1.0.0: Plugin >> > org.eclipse.m2e:lifecycle-mapping:1.0.0 or one of its dependencies could >> > not be resolved: Failure to find >> > org.eclipse.m2e:lifecycle-mapping:jar:1.0.0 in >> > https://repo.maven.apache.org/maven2 was cached in the local >> repository, >> > resolution will not be reattempted until the update interval of central >> > has elapsed or updates are forced >> > [INFO] >> > [INFO] --- exec-maven-plugin:1.4.0:exec (default-cli) @ spark-runner --- >> > log4j:WARN No appenders could be found for logger >> > (com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory). >> > log4j:WARN Please initialize the log4j system properly. >> > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig >> for >> > more info. >> > Using Spark's default log4j profile: >> > org/apache/spark/log4j-defaults.properties >> > 16/04/12 18:20:29 INFO SparkContext: Running Spark version 1.5.2 >> > 16/04/12 18:20:29 WARN NativeCodeLoader: Unable to load native-hadoop >> > library for your platform... using builtin-java classes where applicable >> > 16/04/12 18:20:29 WARN Utils: Your hostname, T resolves to a loopback >> > address: 127.0.1.1; using 192.168.1.119 instead (on interface wlp3s0) >> > 16/04/12 18:20:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to >> > another address >> > 16/04/12 18:20:29 INFO SecurityManager: Changing view acls to: jeff >> > 16/04/12 18:20:29 INFO SecurityManager: Changing modify acls to: jeff >> > 16/04/12 18:20:29 INFO SecurityManager: SecurityManager: authentication >> > disabled; ui acls disabled; users with view permissions: Set(jeff); >> > users with modify permissions: Set(jeff) >> > 16/04/12 18:20:31 INFO Slf4jLogger: Slf4jLogger started >> > 16/04/12 18:20:31 INFO Remoting: Starting remoting >> > 16/04/12 18:20:32 INFO Remoting: Remoting started; listening on >> > addresses :[akka.tcp://[email protected]:40821] >> > 16/04/12 18:20:32 INFO Utils: Successfully started service 'sparkDriver' >> > on port 40821. >> > 16/04/12 18:20:32 INFO SparkEnv: Registering MapOutputTracker >> > 16/04/12 18:20:32 INFO SparkEnv: Registering BlockManagerMaster >> > 16/04/12 18:20:32 INFO DiskBlockManager: Created local directory at >> > /tmp/blockmgr-12aaa425-2b1b-4182-865d-5d231ee10cda >> > 16/04/12 18:20:33 INFO MemoryStore: MemoryStore started with capacity >> > 441.7 MB >> > 16/04/12 18:20:33 INFO HttpFileServer: HTTP File server directory is >> > >> /tmp/spark-091800d6-b90b-48f6-9c8d-f3f01755f59b/httpd-bf2fd43c-b3c9-4840-bc72-e2f16965df9a >> > 16/04/12 18:20:33 INFO HttpServer: Starting HTTP Server >> > 16/04/12 18:20:33 INFO Utils: Successfully started service 'HTTP file >> > server' on port 46840. >> > 16/04/12 18:20:33 INFO SparkEnv: Registering OutputCommitCoordinator >> > 16/04/12 18:20:33 INFO Utils: Successfully started service 'SparkUI' on >> > port 4040. >> > 16/04/12 18:20:33 INFO SparkUI: Started SparkUI at >> http://192.168.1.119:4040 >> > 16/04/12 18:20:34 WARN MetricsSystem: Using default name DAGScheduler >> > for source because spark.app.id is not set. >> > 16/04/12 18:20:34 INFO Executor: Starting executor ID driver on host >> > localhost >> > 16/04/12 18:20:34 INFO Utils: Successfully started service >> > 'org.apache.spark.network.netty.NettyBlockTransferService' on port >> 43549. >> > 16/04/12 18:20:34 INFO NettyBlockTransferService: Server created on >> 43549 >> > 16/04/12 18:20:34 INFO BlockManagerMaster: Trying to register >> BlockManager >> > 16/04/12 18:20:34 INFO BlockManagerMasterEndpoint: Registering block >> > manager localhost:43549 with 441.7 MB RAM, BlockManagerId(driver, >> > localhost, 43549) >> > 16/04/12 18:20:34 INFO BlockManagerMaster: Registered BlockManager >> > 16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Entering >> > directly-translatable composite transform: 'ReadLines' >> > 16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Skipping >> > 'ReadLines/Read'; already in composite transform. >> > 16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Post-visiting >> > directly-translatable composite transform: 'ReadLines' >> > 16/04/12 18:20:34 INFO SparkPipelineRunner$Evaluator: Evaluating >> > ReadLines [TextIO.Read] >> > 16/04/12 18:20:35 INFO MemoryStore: ensureFreeSpace(110248) called with >> > curMem=0, maxMem=463176990 >> > 16/04/12 18:20:35 INFO MemoryStore: Block broadcast_0 stored as values >> > in memory (estimated size 107.7 KB, free 441.6 MB) >> > 16/04/12 18:20:35 INFO MemoryStore: ensureFreeSpace(10056) called with >> > curMem=110248, maxMem=463176990 >> > 16/04/12 18:20:35 INFO MemoryStore: Block broadcast_0_piece0 stored as >> > bytes in memory (estimated size 9.8 KB, free 441.6 MB) >> > 16/04/12 18:20:35 INFO BlockManagerInfo: Added broadcast_0_piece0 in >> > memory on localhost:43549 (size: 9.8 KB, free: 441.7 MB) >> > 16/04/12 18:20:35 INFO SparkContext: Created broadcast 0 from textFile >> > at TransformTranslator.java:471 >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Evaluating >> > ParDo(ExtractWords) >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Evaluating Init >> > [AnonymousParDo] >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Entering >> > directly-translatable composite transform: >> > 'WordCount.CountWords/Count.PerElement/Count.PerKey' >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping >> > >> 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows/ParDo(ReifyTimestampAndWindows)'; >> > already in composite transform. >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping >> > >> 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly'; >> > already in composite transform. >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping >> > >> 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.SortValuesByTimestamp/AnonymousParDo'; >> > already in composite transform. >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping >> > >> 'WordCount.CountWords/Count.PerElement/Count.PerKey/GroupByKey/GroupByKeyViaGroupByKeyOnly/GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow/ParDo(GroupAlsoByWindowsViaOutputBuffer)'; >> > already in composite transform. >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Skipping >> > >> 'WordCount.CountWords/Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo'; >> > already in composite transform. >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Post-visiting >> > directly-translatable composite transform: >> > 'WordCount.CountWords/Count.PerElement/Count.PerKey' >> > 16/04/12 18:20:35 INFO SparkPipelineRunner$Evaluator: Evaluating >> > Count.PerKey [Combine.PerKey] >> > 16/04/12 18:20:36 INFO FileInputFormat: Total input paths to process : 1 >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Evaluating Map >> > [AnonymousParDo] >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Entering >> > directly-translatable composite transform: 'WriteCounts' >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping >> > 'WriteCounts/Write/Create.Values'; already in composite transform. >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping >> > 'WriteCounts/Write/Initialize'; already in composite transform. >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping >> > 'WriteCounts/Write/View.AsSingleton/View.CreatePCollectionView'; already >> > in composite transform. >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping >> > 'WriteCounts/Write/WriteBundles'; already in composite transform. >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping >> > 'WriteCounts/Write/Window.Into()'; already in composite transform. >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping >> > 'WriteCounts/Write/View.AsIterable/View.CreatePCollectionView'; already >> > in composite transform. >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Skipping >> > 'WriteCounts/Write/Finalize'; already in composite transform. >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Post-visiting >> > directly-translatable composite transform: 'WriteCounts' >> > 16/04/12 18:20:36 INFO SparkPipelineRunner$Evaluator: Evaluating >> > WriteCounts [TextIO.Write] >> > 16/04/12 18:20:36 INFO deprecation: mapred.output.dir is deprecated. >> > Instead, use mapreduce.output.fileoutputformat.outputdir >> > 16/04/12 18:20:36 INFO SparkContext: Starting job: >> > saveAsNewAPIHadoopFile at TransformTranslator.java:660 >> > 16/04/12 18:20:36 INFO DAGScheduler: Registering RDD 6 (mapToPair at >> > TransformTranslator.java:304) >> > 16/04/12 18:20:36 INFO DAGScheduler: Got job 0 (saveAsNewAPIHadoopFile >> > at TransformTranslator.java:660) with 1 output partitions >> > 16/04/12 18:20:36 INFO DAGScheduler: Final stage: ResultStage >> > 1(saveAsNewAPIHadoopFile at TransformTranslator.java:660) >> > 16/04/12 18:20:36 INFO DAGScheduler: Parents of final stage: >> > List(ShuffleMapStage 0) >> > 16/04/12 18:20:36 INFO DAGScheduler: Missing parents: >> > List(ShuffleMapStage 0) >> > 16/04/12 18:20:36 INFO DAGScheduler: Submitting ShuffleMapStage 0 >> > (MapPartitionsRDD[6] at mapToPair at TransformTranslator.java:304), >> > which has no missing parents >> > 16/04/12 18:20:36 INFO MemoryStore: ensureFreeSpace(10368) called with >> > curMem=120304, maxMem=463176990 >> > 16/04/12 18:20:36 INFO MemoryStore: Block broadcast_1 stored as values >> > in memory (estimated size 10.1 KB, free 441.6 MB) >> > 16/04/12 18:20:36 INFO MemoryStore: ensureFreeSpace(5001) called with >> > curMem=130672, maxMem=463176990 >> > 16/04/12 18:20:36 INFO MemoryStore: Block broadcast_1_piece0 stored as >> > bytes in memory (estimated size 4.9 KB, free 441.6 MB) >> > 16/04/12 18:20:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in >> > memory on localhost:43549 (size: 4.9 KB, free: 441.7 MB) >> > 16/04/12 18:20:36 INFO SparkContext: Created broadcast 1 from broadcast >> > at DAGScheduler.scala:861 >> > 16/04/12 18:20:36 INFO DAGScheduler: Submitting 1 missing tasks from >> > ShuffleMapStage 0 (MapPartitionsRDD[6] at mapToPair at >> > TransformTranslator.java:304) >> > 16/04/12 18:20:36 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 >> tasks >> > 16/04/12 18:20:36 INFO TaskSetManager: Starting task 0.0 in stage 0.0 >> > (TID 0, localhost, PROCESS_LOCAL, 2142 bytes) >> > 16/04/12 18:20:36 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) >> > 16/04/12 18:20:36 INFO HadoopRDD: Input split: >> > file:/home/jeff/test/beam/input/kinglear.txt:0+185965 >> > 16/04/12 18:20:36 INFO deprecation: mapred.tip.id is deprecated. >> > Instead, use mapreduce.task.id >> > 16/04/12 18:20:36 INFO deprecation: mapred.task.id is deprecated. >> > Instead, use mapreduce.task.attempt.id >> > 16/04/12 18:20:36 INFO deprecation: mapred.task.is.map is deprecated. >> > Instead, use mapreduce.task.ismap >> > 16/04/12 18:20:36 INFO deprecation: mapred.task.partition is deprecated. >> > Instead, use mapreduce.task.partition >> > 16/04/12 18:20:36 INFO deprecation: mapred.job.id is deprecated. >> > Instead, use mapreduce.job.id >> > 16/04/12 18:20:37 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). >> > 4316 bytes result sent to driver >> > 16/04/12 18:20:37 INFO TaskSetManager: Finished task 0.0 in stage 0.0 >> > (TID 0) in 477 ms on localhost (1/1) >> > 16/04/12 18:20:37 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose >> > tasks have all completed, from pool >> > 16/04/12 18:20:37 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at >> > TransformTranslator.java:304) finished in 0.511 s >> > 16/04/12 18:20:37 INFO DAGScheduler: looking for newly runnable stages >> > 16/04/12 18:20:37 INFO DAGScheduler: running: Set() >> > 16/04/12 18:20:37 INFO DAGScheduler: waiting: Set(ResultStage 1) >> > 16/04/12 18:20:37 INFO DAGScheduler: failed: Set() >> > 16/04/12 18:20:37 INFO DAGScheduler: Missing parents for ResultStage 1: >> > List() >> > 16/04/12 18:20:37 INFO DAGScheduler: Submitting ResultStage 1 >> > (MapPartitionsRDD[14] at mapToPair at TransformTranslator.java:486), >> > which is now runnable >> > 16/04/12 18:20:37 INFO MemoryStore: ensureFreeSpace(54976) called with >> > curMem=135673, maxMem=463176990 >> > 16/04/12 18:20:37 INFO MemoryStore: Block broadcast_2 stored as values >> > in memory (estimated size 53.7 KB, free 441.5 MB) >> > 16/04/12 18:20:37 INFO MemoryStore: ensureFreeSpace(19334) called with >> > curMem=190649, maxMem=463176990 >> > 16/04/12 18:20:37 INFO MemoryStore: Block broadcast_2_piece0 stored as >> > bytes in memory (estimated size 18.9 KB, free 441.5 MB) >> > 16/04/12 18:20:37 INFO BlockManagerInfo: Added broadcast_2_piece0 in >> > memory on localhost:43549 (size: 18.9 KB, free: 441.7 MB) >> > 16/04/12 18:20:37 INFO SparkContext: Created broadcast 2 from broadcast >> > at DAGScheduler.scala:861 >> > 16/04/12 18:20:37 INFO DAGScheduler: Submitting 1 missing tasks from >> > ResultStage 1 (MapPartitionsRDD[14] at mapToPair at >> > TransformTranslator.java:486) >> > 16/04/12 18:20:37 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 >> tasks >> > 16/04/12 18:20:37 INFO TaskSetManager: Starting task 0.0 in stage 1.0 >> > (TID 1, localhost, PROCESS_LOCAL, 1901 bytes) >> > 16/04/12 18:20:37 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) >> > 16/04/12 18:20:37 INFO deprecation: mapreduce.outputformat.class is >> > deprecated. Instead, use mapreduce.job.outputformat.class >> > 16/04/12 18:20:37 INFO deprecation: mapred.output.key.class is >> > deprecated. Instead, use mapreduce.job.output.key.class >> > 16/04/12 18:20:37 INFO deprecation: mapred.output.value.class is >> > deprecated. Instead, use mapreduce.job.output.value.class >> > 16/04/12 18:20:37 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty >> > blocks out of 1 blocks >> > 16/04/12 18:20:37 INFO ShuffleBlockFetcherIterator: Started 0 remote >> > fetches in 6 ms >> > 16/04/12 18:20:37 INFO FileOutputCommitter: Saved output of task >> > 'attempt_201604121820_0014_r_000000_0' to >> > file:/home/jeff/test/beam/_temporary/0/task_201604121820_0014_r_000000 >> > 16/04/12 18:20:37 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). >> > 1326 bytes result sent to driver >> > 16/04/12 18:20:37 INFO TaskSetManager: Finished task 0.0 in stage 1.0 >> > (TID 1) in 126 ms on localhost (1/1) >> > 16/04/12 18:20:37 INFO DAGScheduler: ResultStage 1 >> > (saveAsNewAPIHadoopFile at TransformTranslator.java:660) finished in >> 0.127 s >> > 16/04/12 18:20:37 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose >> > tasks have all completed, from pool >> > 16/04/12 18:20:37 INFO DAGScheduler: Job 0 finished: >> > saveAsNewAPIHadoopFile at TransformTranslator.java:660, took 0.881797 s >> > 16/04/12 18:20:37 INFO SparkPipelineRunner: Pipeline execution complete. >> > 16/04/12 18:20:37 INFO SparkContext: Invoking stop() from shutdown hook >> > 16/04/12 18:20:37 INFO SparkUI: Stopped Spark web UI at >> > http://192.168.1.119:4040 >> > 16/04/12 18:20:37 INFO DAGScheduler: Stopping DAGScheduler >> > 16/04/12 18:20:37 INFO MapOutputTrackerMasterEndpoint: >> > MapOutputTrackerMasterEndpoint stopped! >> > 16/04/12 18:20:37 INFO MemoryStore: MemoryStore cleared >> > 16/04/12 18:20:37 INFO BlockManager: BlockManager stopped >> > 16/04/12 18:20:37 INFO BlockManagerMaster: BlockManagerMaster stopped >> > 16/04/12 18:20:37 INFO SparkContext: Successfully stopped SparkContext >> > 16/04/12 18:20:37 INFO >> > OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: >> > OutputCommitCoordinator stopped! >> > 16/04/12 18:20:37 INFO ShutdownHookManager: Shutdown hook called >> > 16/04/12 18:20:37 INFO ShutdownHookManager: Deleting directory >> > /tmp/spark-091800d6-b90b-48f6-9c8d-f3f01755f59b >> > [INFO] >> > ------------------------------------------------------------------------ >> > [INFO] BUILD SUCCESS >> > [INFO] >> > ------------------------------------------------------------------------ >> > [INFO] Total time: 14.206 s >> > [INFO] Finished at: 2016-04-12T18:20:37+08:00 >> > [INFO] Final Memory: 23M/167M >> > [INFO] >> > ------------------------------------------------------------------------ >> > >> > >> >> -- >> Jean-Baptiste Onofré >> [email protected] >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> >
