Dear all

I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on 
Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================

The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 
max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 
m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 
m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 
m ax latency.
...
==================================

We hope that there are about 10,000 in each window every second by the 
following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");

PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, 
String>read()
  .withBootstrapServers("ubuntu7:9092")
  .withTopic("kafkasink")
  .withKeyDeserializer(IntegerDeserializer.class)
  .withValueDeserializer(StringDeserializer.class)
  //.withMaxNumRecords(500000)
  .withoutMetadata());

PCollection<KV<Integer, String>> readData1 = readData.
  apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardSeconds(1)))
    .triggering(AfterWatermark.pastEndOfWindow()
      
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
    .withAllowedLateness(Duration.ZERO)
    .discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224  3000
225  0
226  3000
227  0
228  0
236  0
237  0
238  5000

Unfortunately, results that we are looking forward to is:
224  9000
225  11000
226  9505
227  9829
228  10001

I do not know how to deal with this situation that maybe is about data latency?

1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong 
with settings of spark runner? as the issue 
BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>

If any further information is needed, I am glad to be informed and will provide 
to you as soon as possible.

I will highly appreciate it if you can help me to overcome this.

I am looking forward to hearing from you.

Sincerely yours,

Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain 
confidential information. Please do not use or disclose it in any way and 
delete it if you are not the intended recipient.

Reply via email to