Can you check the logs on the worker? On Wed, Jun 13, 2018 at 2:26 AM <linr...@itri.org.tw> wrote:
> Dear all, > > > > I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner). > > My running environment is: > > OS: Ubuntn 14.04.3 LTS > > The different version for these tools: > > JAVA: JDK 1.8 > > Beam 2.0.0 (Spark runner with Standalone mode) > > Spark 1.6.0 > > Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two > worker nodes: ubuntu8 and ubuntu9 > > Kafka: 2.10-0.10.1.1 > > > > The java code of my project is: > > > ============================================================================== > > SparkPipelineOptions options = PipelineOptionsFactory.*as* > (SparkPipelineOptions.*class*); > > options.setRunner(SparkRunner.*class*); > > options.setSparkMaster("spark://ubuntu8:7077"); > > options.setAppName("App kafkaBeamTest"); > > options.setJobName("Job kafkaBeamTest"); > > *options**.setMaxRecordsPerBatch(1000L);* > > > > Pipeline p = Pipeline.create(options); > > > > System.out.println("Beamtokafka"); > > PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long, > String>read() > > .withBootstrapServers(ubuntu7:9092) > > .withTopic("kafkasink") > > .withKeyDeserializer(LongDeserializer.class) > > .withValueDeserializer(StringDeserializer.class) > > .withoutMetadata() > > ); > > > > PCollection<KV<Long, String>> readDivideData = readData. > > > apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardSeconds(1))) > > .triggering(AfterWatermark.pastEndOfWindow() > > > .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) > > .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); > > > > System.out.println("CountData"); > > > > PCollection<KV<Long, Long>> countData = > readDivideData.apply(Count.perKey()); > > > > p.run(); > > > ============================================================================== > > > > The message of error is: > > > ============================================================================== > > Exception in thread "streaming-job-executor-0" java.lang.Error: > java.lang.InterruptedException > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.InterruptedException > > at java.lang.Object.wait(Native Method) > > at java.lang.Object.wait(Object.java:502) > > at > org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) > > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) > > at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912) > > at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > > at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) > > … > > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) > > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) > > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > ... 2 more > > > ============================================================================== > > > > Maven 3.5.0, in which related dependencies are listed in my project’s > pom.xml: > > <dependency> > > <groupId>org.apache.beam</groupId> > > <artifactId>beam-sdks-java-core</artifactId> > > <version>2.0.0</version> > > </dependency> > > <dependency> > > <groupId>org.apache.beam</groupId> > > <artifactId>beam-sdks-java-io-kafka</artifactId> > > <version>2.0.0</version> > > </dependency> > > <dependency> > > <groupId>org.apache.spark</groupId> > > <artifactId>spark-core_2.10</artifactId> > > <version>1.6.0</version> > > </dependency> > > <dependency> > > <groupId>org.apache.spark</groupId> > > <artifactId>spark-streaming_2.10</artifactId> > > <version>1.6.0</version> > > </dependency> > > > > <dependency> > > <groupId>org.apache.kafka</groupId> > > <artifactId>kafka-clients</artifactId> > > <version>0.10.1.1</version> > > </dependency> > > <dependency> > > <groupId>org.apache.kafka</groupId> > > <artifactId>kafka_2.10</artifactId> > > <version>0.10.1.1</version> > > </dependency> > > > > > > When I use the above code in Spark Runner (Local [4]), this project worked > well (2000~4000 data/s). However, if I run it on Standalone mode, it failed > along with the above error. > > > > If you have any idea about the error ("streaming-job-executor-0"), I am > looking forward to hearing from you. > > > > Note that: perform command line is “./spark-submit --class > com.itri.beam.kafkatest --master spark:// ubuntu8:7077 > /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner” > > > > Thanks > > > > Rick > > > > > > > -- > 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain > confidential information. Please do not use or disclose it in any way and > delete it if you are not the intended recipient. >