[ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522866#comment-16522866 ]
Kenneth Knowles commented on BEAM-4632: --------------------------------------- I removed the "Fix Version" as I don't know that this was resolved. > kafkIO should be the streaming mode over spark runner > ----------------------------------------------------- > > Key: BEAM-4632 > URL: https://issues.apache.org/jira/browse/BEAM-4632 > Project: Beam > Issue Type: Bug > Components: io-java-kafka, runner-spark > Affects Versions: 2.4.0 > Environment: Ubuntu 16.04.4 LTS > Reporter: Rick Lin > Assignee: Alexey Romanenko > Priority: Major > > Dear sir, > The following versions of related tools are set in my running program: > ================================== > Beam 2.4.0 (Direct runner and Spark runner) > Spark 2.2.1 (local mode and standalone mode) > Kafka: 2.11-0.10.1.1 > scala: 2.11.8 > java: 1.8 > ================================== > My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my > github: [https://github.com/LinRick/beamkafkaIO], > The description of my situation is as: > {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is > used to capture data from the assigned broker ip > ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color} > {color:#14892c}The user manual of kafkaIO SDK (on > web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/]) indicates > that the following parameters need to be set, and then the kafkaIO can work > well.{color} > {color:#FF0000}.withBootstrapServers("kafka broker ip:9092"){color} > {color:#FF0000} .withTopic("kafkasink"){color} > {color:#FF0000} .withKeyDeserializer(IntegerDeserializer.class){color} > {color:#FF0000} .withValueDeserializer(StringDeserializer.class) {color} > When i run my program with these settings over direct runner, i can find that > my program perform well. In addition, my running program is the streaming > mode. *However, i run these codes with the same settings (kafkaIO) over spark > runner, and my running program is not the streaming mode and is shutdown*. > Here, as mentioned on the website: > [https://beam.apache.org/documentation/runners/spark/], the performing > program will automatically set streaming mode. > Unfortunately, it failed for my program. > On the other hand, If i set the parameter kafkaIO.read.withMaxNumRecords > (1000) or kafkaIO.read.withMaxReadTime (Duration second), my program will > successfully execute as the batch mode (batch processing). > The steps of performing StarterPipeline.java in my program are: > step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline > -Pspark2-runner -Dexec.args="--runner=SparkRunner" > step2 mvn clean package > step3 cp -rf target/beamkafkaIO-0.1.jar /root/ > step4 cd /spark-2.2.1-bin-hadoop2.6/bin > step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] > /root/beamkafkaIO-0.1.jar --runner=SparkRunner > I am not sure if this issue is a bug about kafkaIO or I was wrong with some > parameter settings over spark runner ? > I really can't handle it, so I hope to get help from you. > if any further information is needed, i am glad to be informed and will > provide to you as soon as possible. > I will highly appreciate it if you can help me to deal with this issue. > i am looking forward to hearing from you. > > Sincerely yours, > > Rick > -- This message was sent by Atlassian JIRA (v7.6.3#76005)