Dear Nicolas,

Yes, I have set this configure, as

Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
    apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
      .triggering(AfterWatermark.pastEndOfWindow()
        
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());

However, the result will show in the following.
“1158      1000
1159        0
1160        0
1161        0
1162        0
1163        0
1164        1000
1165        0
1166        0
1167        0
1168        0
1169        0
1170        0
….”

Rick

From: Nicolas Viard [mailto:nicolas.vi...@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner


Hello,

I think Spark has a default windowing strategy and pulls data from kafka every 
X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas

________________________________
De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> 
<linr...@itri.org.tw<mailto:linr...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<mailto:user@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner


Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==================================

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 10000000 \

--record-size 100 \

--topic kafkasink \

--throughput 10000 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==================================



The display of Kafka broker on console is as:

==================================

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.

...

==================================



We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeline.java:

==================================

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, 
String>read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(500000)

  .withoutMetadata());



PCollection<KV<Integer, String>> readData1 = readData.

  apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardSeconds(1)))

    .triggering(AfterWatermark.pastEndOfWindow()

      
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

    .withAllowedLateness(Duration.ZERO)

    .discardingFiredPanes());

…

==================================

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong 
with settings of spark runner? as the issue 
BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.

Reply via email to