Hi Today, I user Eclipse debug code,When I submit to spark cluster,After a moment,I see some error message,It as below: 2017-03-20 16:37:44 ERROR org.apache.spark.Logging$class.logError(Logging.scala:74) task-result-getter-3 [Task 31.0 in stage 109.0 (TID 848) had a not serializable result: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers Serialization stack: - object not serializable (class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, value: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb,[])) - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) - object (class scala.Tuple2, (org.apache.beam.runners.spark.util.ByteArray@107f,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb,[]))); not retrying] 2017-03-20 16:37:44 ERROR org.apache.spark.Logging$class.logError(Logging.scala:95) JobScheduler [Error running job streaming job 1489998839500 ms.0] org.apache.spark.SparkException: Job aborted due to stage failure: Task 31.0 in stage 109.0 (TID 848) had a not serializable result: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers Serialization stack: - object not serializable (class: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers, value: org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb,[])) - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) - object (class scala.Tuple2, (org.apache.beam.runners.spark.util.ByteArray@107f,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb,[]))) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library-2.10.4.jar:?] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:?] at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at scala.Option.foreach(Option.scala:236) ~[scala-library-2.10.4.jar:?] at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) [spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.streaming.dstream.DStream$$anonfun$2.apply(DStream.scala:446) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.streaming.dstream.DStream$$anonfun$2.apply(DStream.scala:444) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at scala.util.Try$.apply(Try.scala:161) ~[scala-library-2.10.4.jar:?] at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) ~[scala-library-2.10.4.jar:?] at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:1.7.0_67] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:1.7.0_67] at java.lang.Thread.run(Unknown Source) ~[?:1.7.0_67] Please help me to resovle this proplem,Thanks.
From: Jiyu JY2 Shi Sent: Friday, March 17, 2017 5:22 PM To: 'dev@beam.apache.org' Subject: user beam in pre-spark cluster have some proplems Hi I user beam in spark cluster,The application is blow. SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); options.setEnableSparkMetricSinks(false); options.setStreaming(true); options.setSparkMaster("spark://10.100.124.205:6066"); options.setAppName("Beam App Spark"+new Random().nextFloat()); options.setJobName("Beam Job Spark"+new Random().nextFloat()); System.out.println("App Name:"+options.getAppName()); System.out.println("Job Name:"+options.getJobName()); options.setMaxRecordsPerBatch(100000L); // PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); // Duration size = Duration.standardMinutes(4); long duration = 60; if(args!=null && args.length==1){ duration = Integer.valueOf(args[0]); } Duration size = Duration.standardSeconds(duration); System.out.println("时间窗口为:["+duration+"]秒"); Window.Bound<KV<String,String>> fixWindow = Window.<KV<String,String>> into( FixedWindows.of(size) ); String kafkaAddress = "10.100.124.208:9093"; // String kafkaAddress = "192.168.100.212:9092"; Map<String, Object> kfConsunmerConf = new HashMap<String, Object>(); kfConsunmerConf.put("auto.offset.reset", "latest"); PCollection<String> kafkaJsonPc = p.apply(KafkaIO.<String, String> read() .withBootstrapServers(kafkaAddress) .withTopics(ImmutableList.of("wypxx1")) .withKeyCoder(StringUtf8Coder.of()) .withValueCoder(StringUtf8Coder.of()) .updateConsumerProperties(kfConsunmerConf) .withoutMetadata() ).apply(Values.<String> create()); PCollection<KV<String,String>> totalPc = kafkaJsonPc.apply( "count line", ParDo.of(new DoFn<String,KV<String,String>>() { @ProcessElement public void processElement(ProcessContext c) { String line = c.element(); Instant is = c.timestamp(); if(line.length()>2) line = line.substring(0,2); System.out.println(line + " " + is.toString()); c.output(KV.of(line, line)); } }) ); PCollection<KV<String, Iterable<String>>> itPc = totalPc.apply(fixWindow).apply( "group by appKey", GroupByKey.<String, String>create() ); itPc.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() { @ProcessElement public void processElement(ProcessContext c) { KV<String, Iterable<String>> keyIt = c.element(); String key = keyIt.getKey(); Iterable<String> itb = keyIt.getValue(); Iterator<String> it = itb.iterator(); StringBuilder sb = new StringBuilder(); sb.append(key).append(":["); while(it.hasNext()){ sb.append(it.next()).append(","); } String str = sb.toString(); str = str.substring(0,str.length() -1) + "]"; System.out.println(str); String filePath = "/data/wyp/sparktest.txt"; String line = "word-->["+key+"]total count="+str+"--->time+"+c.timestamp().toString(); System.out.println("writefile----->"+line); FileUtil.write(filePath, line, true, true); } })); p.run().waitUntilFinish(); When I user submit application to spark cluster.In spark UI,I can see log of totalPc PCollection of. after one miniter but I can.t see log of itPc PCollection. I use local mode spark,It work well. Please help me to resovle this proplems.Thanks!