Hi
Today, I user Eclipse debug code,When I submit to spark cluster,After a 
moment,I see some error message,It as below:
2017-03-20 16:37:44 ERROR 
org.apache.spark.Logging$class.logError(Logging.scala:74) task-result-getter-3 
[Task 31.0 in stage 109.0 (TID 848) had a not serializable result: 
org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
Serialization stack:
    - object not serializable (class: 
org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers,
 value: 
org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, 
(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb,[]))
    - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
    - object (class scala.Tuple2, 
(org.apache.beam.runners.spark.util.ByteArray@107f,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb,[])));
 not retrying]
2017-03-20 16:37:44 ERROR 
org.apache.spark.Logging$class.logError(Logging.scala:95) JobScheduler [Error 
running job streaming job 1489998839500 ms.0]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 31.0 in 
stage 109.0 (TID 848) had a not serializable result: 
org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers
Serialization stack:
    - object not serializable (class: 
org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers,
 value: 
org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, 
(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb,[]))
    - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
    - object (class scala.Tuple2, 
(org.apache.beam.runners.spark.util.ByteArray@107f,(org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet$StateAndTimers@658604bb,[])))
    at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
~[scala-library-2.10.4.jar:?]
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
~[scala-library-2.10.4.jar:?]
    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at scala.Option.foreach(Option.scala:236) ~[scala-library-2.10.4.jar:?]
    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$2.apply(DStream.scala:446) 
~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.streaming.dstream.DStream$$anonfun$2.apply(DStream.scala:444) 
~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at scala.util.Try$.apply(Try.scala:161) ~[scala-library-2.10.4.jar:?]
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
~[scala-library-2.10.4.jar:?]
    at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
 ~[spark-assembly-1.6.3-hadoop2.6.0.jar:1.6.3]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:1.7.0_67]
  at java.lang.Thread.run(Unknown Source) ~[?:1.7.0_67]
Please help me to resovle this proplem,Thanks.

From: Jiyu JY2 Shi
Sent: Friday, March 17, 2017 5:22 PM
To: 'dev@beam.apache.org'
Subject: user beam in pre-spark cluster have some proplems

Hi
  I user beam in spark cluster,The application is blow.
SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
        options.setRunner(SparkRunner.class);
        options.setEnableSparkMetricSinks(false);
        options.setStreaming(true);
        options.setSparkMaster("spark://10.100.124.205:6066");
        options.setAppName("Beam App Spark"+new Random().nextFloat());
        options.setJobName("Beam Job Spark"+new Random().nextFloat());
        System.out.println("App Name:"+options.getAppName());
        System.out.println("Job Name:"+options.getJobName());
        options.setMaxRecordsPerBatch(100000L);

//      PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

//      Duration size = Duration.standardMinutes(4);
        long duration = 60;
        if(args!=null && args.length==1){
            duration = Integer.valueOf(args[0]);
        }
        Duration size = Duration.standardSeconds(duration);
        System.out.println("时间窗口为:["+duration+"]秒");
        Window.Bound<KV<String,String>> fixWindow = Window.<KV<String,String>> 
into(
            FixedWindows.of(size)
        );

        String kafkaAddress = "10.100.124.208:9093";
//      String kafkaAddress = "192.168.100.212:9092";

        Map<String, Object> kfConsunmerConf = new HashMap<String, Object>();
        kfConsunmerConf.put("auto.offset.reset", "latest");
        PCollection<String> kafkaJsonPc = p.apply(KafkaIO.<String, String> 
read()
            .withBootstrapServers(kafkaAddress)
            .withTopics(ImmutableList.of("wypxx1"))
            .withKeyCoder(StringUtf8Coder.of())
            .withValueCoder(StringUtf8Coder.of())
            .updateConsumerProperties(kfConsunmerConf)
            .withoutMetadata()
        ).apply(Values.<String> create());


        PCollection<KV<String,String>> totalPc = kafkaJsonPc.apply(
                "count line",
                ParDo.of(new DoFn<String,KV<String,String>>() {
                    @ProcessElement
                      public void processElement(ProcessContext c) {
                        String line = c.element();
                        Instant is = c.timestamp();
                        if(line.length()>2)
                          line = line.substring(0,2);
                        System.out.println(line + " " +  is.toString());
                        c.output(KV.of(line, line));
                      }
                 })
            );


        PCollection<KV<String, Iterable<String>>> itPc = 
totalPc.apply(fixWindow).apply(
                "group by appKey",
                GroupByKey.<String, String>create()
            );
          itPc.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    KV<String, Iterable<String>> keyIt = c.element();
                    String key = keyIt.getKey();
                    Iterable<String> itb = keyIt.getValue();
                    Iterator<String> it = itb.iterator();
                    StringBuilder sb = new StringBuilder();
                    sb.append(key).append(":[");
                    while(it.hasNext()){
                        sb.append(it.next()).append(",");
                    }
                    String str = sb.toString();

                    str = str.substring(0,str.length() -1) + "]";
                    System.out.println(str);
                    String filePath = "/data/wyp/sparktest.txt";
                    String line = "word-->["+key+"]total 
count="+str+"--->time+"+c.timestamp().toString();
                    System.out.println("writefile----->"+line);
                    FileUtil.write(filePath, line, true, true);
                }

             }));

            p.run().waitUntilFinish();

When I user submit application to spark cluster.In spark UI,I can see log of  
totalPc PCollection  of. after one miniter but I can.t see log of itPc 
PCollection.
I use local mode spark,It work well.
Please help me to resovle this proplems.Thanks!

Reply via email to