Can you please update the version of Beam to at least version 2.2.0.
There were some important fixes in streaming after the 2.0.0 release
so this could be related. Ideally you should use the latest released
version (2.4.0). Remember that starting with Beam 2.3.0 the Spark
runner is based on Spark 2.

On Wed, Jun 13, 2018 at 5:11 PM Raghu Angadi <rang...@google.com> wrote:
>
> Can you check the logs on the worker?
>
> On Wed, Jun 13, 2018 at 2:26 AM <linr...@itri.org.tw> wrote:
>>
>> Dear all,
>>
>>
>>
>> I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).
>>
>> My running environment is:
>>
>> OS: Ubuntn 14.04.3 LTS
>>
>> The different version for these tools:
>>
>> JAVA: JDK 1.8
>>
>> Beam 2.0.0 (Spark runner with Standalone mode)
>>
>> Spark 1.6.0
>>
>> Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two 
>> worker nodes: ubuntu8 and ubuntu9
>>
>> Kafka: 2.10-0.10.1.1
>>
>>
>>
>> The java code of my project is:
>>
>> ==============================================================================
>>
>> SparkPipelineOptions options = 
>> PipelineOptionsFactory.as(SparkPipelineOptions.class);
>>
>> options.setRunner(SparkRunner.class);
>>
>> options.setSparkMaster("spark://ubuntu8:7077");
>>
>> options.setAppName("App kafkaBeamTest");
>>
>> options.setJobName("Job kafkaBeamTest");
>>
>> options.setMaxRecordsPerBatch(1000L);
>>
>>
>>
>> Pipeline p = Pipeline.create(options);
>>
>>
>>
>> System.out.println("Beamtokafka");
>>
>> PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long, String>read()
>>
>> .withBootstrapServers(ubuntu7:9092)
>>
>> .withTopic("kafkasink")
>>
>> .withKeyDeserializer(LongDeserializer.class)
>>
>> .withValueDeserializer(StringDeserializer.class)
>>
>>        .withoutMetadata()
>>
>>        );
>>
>>
>>
>> PCollection<KV<Long, String>> readDivideData = readData.
>>
>> apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardSeconds(1)))
>>
>>      .triggering(AfterWatermark.pastEndOfWindow()
>>
>>        
>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
>>
>>      .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());
>>
>>
>>
>> System.out.println("CountData");
>>
>>
>>
>> PCollection<KV<Long, Long>> countData = readDivideData.apply(Count.perKey());
>>
>>
>>
>> p.run();
>>
>> ==============================================================================
>>
>>
>>
>> The message of error is:
>>
>> ==============================================================================
>>
>> Exception in thread "streaming-job-executor-0" java.lang.Error: 
>> java.lang.InterruptedException
>>
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)
>>
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>>         at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.InterruptedException
>>
>>         at java.lang.Object.wait(Native Method)
>>
>>         at java.lang.Object.wait(Object.java:502)
>>
>>         at 
>> org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
>>
>>         at 
>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)
>>
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>>
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>>
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>>
>>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>>
>>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)
>>
>>         at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
>>
>>         at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>
>>         at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>
>>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>
>>         at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
>>
>> …
>>
>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>
>> at 
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>
>> at 
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>
>> at 
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at 
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>>
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> ... 2 more
>>
>> ==============================================================================
>>
>>
>>
>> Maven 3.5.0, in which related dependencies are listed in my project’s 
>> pom.xml:
>>
>> <dependency>
>>
>> <groupId>org.apache.beam</groupId>
>>
>>   <artifactId>beam-sdks-java-core</artifactId>
>>
>>   <version>2.0.0</version>
>>
>> </dependency>
>>
>> <dependency>
>>
>> <groupId>org.apache.beam</groupId>
>>
>>    <artifactId>beam-sdks-java-io-kafka</artifactId>
>>
>>    <version>2.0.0</version>
>>
>> </dependency>
>>
>> <dependency>
>>
>> <groupId>org.apache.spark</groupId>
>>
>>   <artifactId>spark-core_2.10</artifactId>
>>
>>   <version>1.6.0</version>
>>
>> </dependency>
>>
>> <dependency>
>>
>> <groupId>org.apache.spark</groupId>
>>
>>   <artifactId>spark-streaming_2.10</artifactId>
>>
>>   <version>1.6.0</version>
>>
>> </dependency>
>>
>>
>>
>> <dependency>
>>
>> <groupId>org.apache.kafka</groupId>
>>
>>   <artifactId>kafka-clients</artifactId>
>>
>>   <version>0.10.1.1</version>
>>
>> </dependency>
>>
>> <dependency>
>>
>> <groupId>org.apache.kafka</groupId>
>>
>>   <artifactId>kafka_2.10</artifactId>
>>
>>   <version>0.10.1.1</version>
>>
>> </dependency>
>>
>>
>>
>>
>>
>> When I use the above code in Spark Runner (Local [4]), this project worked 
>> well (2000~4000 data/s). However, if I run it on Standalone mode, it failed 
>> along with the above error.
>>
>>
>>
>> If you have any idea about the error ("streaming-job-executor-0"), I am 
>> looking forward to hearing from you.
>>
>>
>>
>> Note that: perform command line is “./spark-submit --class 
>> com.itri.beam.kafkatest --master spark:// ubuntu8:7077 
>> /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”
>>
>>
>>
>> Thanks
>>
>>
>>
>> Rick
>>
>>
>>
>>
>>
>>
>>
>> --
>> 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
>> confidential information. Please do not use or disclose it in any way and 
>> delete it if you are not the intended recipient.

Reply via email to