Dear all,

I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).
My running environment is:
OS: Ubuntn 14.04.3 LTS
The different version for these tools:
JAVA: JDK 1.8
Beam 2.0.0 (Spark runner with Standalone mode)
Spark 1.6.0
Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two worker 
nodes: ubuntu8 and ubuntu9
Kafka: 2.10-0.10.1.1

The java code of my project is:
==============================================================================
SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("spark://ubuntu8:7077");
options.setAppName("App kafkaBeamTest");
options.setJobName("Job kafkaBeamTest");
options.setMaxRecordsPerBatch(1000L);

Pipeline p = Pipeline.create(options);

System.out.println("Beamtokafka");
PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(ubuntu7:9092)
.withTopic("kafkasink")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
       .withoutMetadata()
       );

PCollection<KV<Long, String>> readDivideData = readData.
apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardSeconds(1)))
     .triggering(AfterWatermark.pastEndOfWindow()
       
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
     .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());

System.out.println("CountData");

PCollection<KV<Long, Long>> countData = readDivideData.apply(Count.perKey());

p.run();
==============================================================================

The message of error is:
==============================================================================
Exception in thread "streaming-job-executor-0" java.lang.Error: 
java.lang.InterruptedException
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Object.wait(Object.java:502)
        at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
…
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
==============================================================================

Maven 3.5.0, in which related dependencies are listed in my project’s pom.xml:
<dependency>
<groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
   <artifactId>beam-sdks-java-io-kafka</artifactId>
   <version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.10</artifactId>
  <version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>1.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>0.10.1.1</version>
</dependency>


When I use the above code in Spark Runner (Local [4]), this project worked well 
(2000~4000 data/s). However, if I run it on Standalone mode, it failed along 
with the above error.

If you have any idea about the error ("streaming-job-executor-0"), I am looking 
forward to hearing from you.

Note that: perform command line is “./spark-submit --class 
com.itri.beam.kafkatest --master spark:// ubuntu8:7077 
/root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”

Thanks

Rick




--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.

Reply via email to