Dear all,
I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).
My running environment is:
OS: Ubuntn 14.04.3 LTS
The different version for these tools:
JAVA: JDK 1.8
Beam 2.0.0 (Spark runner with Standalone mode)
Spark 1.6.0
Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two worker
nodes: ubuntu8 and ubuntu9
Kafka: 2.10-0.10.1.1
The java code of my project is:
==============================================================================
SparkPipelineOptions options =
PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("spark://ubuntu8:7077");
options.setAppName("App kafkaBeamTest");
options.setJobName("Job kafkaBeamTest");
options.setMaxRecordsPerBatch(1000L);
Pipeline p = Pipeline.create(options);
System.out.println("Beamtokafka");
PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(ubuntu7:9092)
.withTopic("kafkasink")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
);
PCollection<KV<Long, String>> readDivideData = readData.
apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO) .discardingFiredPanes());
System.out.println("CountData");
PCollection<KV<Long, Long>> countData = readDivideData.apply(Count.perKey());
p.run();
==============================================================================
The message of error is:
==============================================================================
Exception in thread "streaming-job-executor-0" java.lang.Error:
java.lang.InterruptedException
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
…
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
... 2 more
==============================================================================
Maven 3.5.0, in which related dependencies are listed in my project’s pom.xml:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.1.1</version>
</dependency>
When I use the above code in Spark Runner (Local [4]), this project worked well
(2000~4000 data/s). However, if I run it on Standalone mode, it failed along
with the above error.
If you have any idea about the error ("streaming-job-executor-0"), I am looking
forward to hearing from you.
Note that: perform command line is “./spark-submit --class
com.itri.beam.kafkatest --master spark:// ubuntu8:7077
/root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”
Thanks
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain
confidential information. Please do not use or disclose it in any way and
delete it if you are not the intended recipient.