[ https://issues.apache.org/jira/browse/BEAM-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía reassigned BEAM-1789: ---------------------------------- Assignee: (was: Amit Sela) > can't use window in spark cluster module > ---------------------------------------- > > Key: BEAM-1789 > URL: https://issues.apache.org/jira/browse/BEAM-1789 > Project: Beam > Issue Type: Bug > Components: runner-spark > Reporter: tianyou > Priority: Major > > I user beam in spark cluster,The application is blow. > SparkPipelineOptions options = > PipelineOptionsFactory.as(SparkPipelineOptions.class); > options.setRunner(SparkRunner.class); > options.setEnableSparkMetricSinks(false); > options.setStreaming(true); > options.setSparkMaster("spark://10.100.124.205:6066"); > options.setAppName("Beam App Spark"+new Random().nextFloat()); > options.setJobName("Beam Job Spark"+new Random().nextFloat()); > System.out.println("App Name:"+options.getAppName()); > System.out.println("Job Name:"+options.getJobName()); > options.setMaxRecordsPerBatch(100000L); > > // PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > // Duration size = Duration.standardMinutes(4); > long duration = 60; > if(args!=null && args.length==1){ > duration = Integer.valueOf(args[0]); > } > Duration size = Duration.standardSeconds(duration); > System.out.println("时间窗口为:["+duration+"]秒"); > Window.Bound<KV<String,String>> fixWindow = > Window.<KV<String,String>> into( > FixedWindows.of(size) > ); > > String kafkaAddress = "10.100.124.208:9093"; > // String kafkaAddress = "192.168.100.212:9092"; > > Map<String, Object> kfConsunmerConf = new HashMap<String, Object>(); > kfConsunmerConf.put("auto.offset.reset", "latest"); > PCollection<String> kafkaJsonPc = p.apply(KafkaIO.<String, String> > read() > .withBootstrapServers(kafkaAddress) > .withTopics(ImmutableList.of("wypxx1")) > .withKeyCoder(StringUtf8Coder.of()) > .withValueCoder(StringUtf8Coder.of()) > .updateConsumerProperties(kfConsunmerConf) > .withoutMetadata() > ).apply(Values.<String> create()); > > > PCollection<KV<String,String>> totalPc = kafkaJsonPc.apply( > "count line", > ParDo.of(new DoFn<String,KV<String,String>>() { > @ProcessElement > public void processElement(ProcessContext c) { > String line = c.element(); > Instant is = c.timestamp(); > if(line.length()>2) > line = line.substring(0,2); > System.out.println(line + " " + is.toString()); > c.output(KV.of(line, line)); > } > }) > ); > > > PCollection<KV<String, Iterable<String>>> itPc = > totalPc.apply(fixWindow).apply( > "group by appKey", > GroupByKey.<String, String>create() > ); > itPc.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, Void>() { > @ProcessElement > public void processElement(ProcessContext c) { > KV<String, Iterable<String>> keyIt = c.element(); > String key = keyIt.getKey(); > Iterable<String> itb = keyIt.getValue(); > Iterator<String> it = itb.iterator(); > StringBuilder sb = new StringBuilder(); > sb.append(key).append(":["); > while(it.hasNext()){ > sb.append(it.next()).append(","); > } > String str = sb.toString(); > str = str.substring(0,str.length() -1) + "]"; > System.out.println(str); > String filePath = "/data/wyp/sparktest.txt"; > String line = "word-->["+key+"]total > count="+str+"--->time+"+c.timestamp().toString(); > System.out.println("writefile----->"+line); > FileUtil.write(filePath, line, true, true); > } > > })); > > p.run().waitUntilFinish(); > When I user submit application to spark cluster.In spark UI,I can see log of > totalPc PCollection of. after one miniter but I can.t see log of itPc > PCollection. > I use local mode spark,It work well. > Please help me to resovle this proplems.Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)