Dear all I have a question about the use of windows/triggers. The following versions of related tools are set in my running program: ================================== Beam 2.4.0 (Direct runner and Spark runner) Spark 2.3.1 (local mode) Kafka: 2.11-0.10.1.1 scala: 2.11.8 java: 1.8 ================================== My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO The configuration setting of Kafka broker is: ================================== /kafka_broker/bin/kafka-producer-perf-test.sh \ --num-records 10000000 \ --record-size 100 \ --topic kafkasink \ --throughput 10000 \ --producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000 ==================================
The display of Kafka broker on console is as: ================================== ... 49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency. 50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency. 50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. 50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency. 50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. ... ================================== We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java: ================================== … SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("local[*]"); PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) //.withMaxNumRecords(500000) .withoutMetadata()); PCollection<KV<Integer, String>> readData1 = readData. apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); … ================================== The processed data will be imported into PostgresSQL. The display of results in DB is shown as follows. 224 3000 225 0 226 3000 227 0 228 0 236 0 237 0 238 5000 Unfortunately, results that we are looking forward to is: 224 9000 225 11000 226 9505 227 9829 228 10001 I do not know how to deal with this situation that maybe is about data latency? 1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632> If any further information is needed, I am glad to be informed and will provide to you as soon as possible. I will highly appreciate it if you can help me to overcome this. I am looking forward to hearing from you. Sincerely yours, Rick -- 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.