Dear Nicolas, Yes, I have set this configure, as
Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setBatchIntervalMillis(1000L); options.setSparkMaster("local[*]"); … PCollection<KV<Integer, String>> readData1 = readData. apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); However, the result will show in the following. “1158 1000 1159 0 1160 0 1161 0 1162 0 1163 0 1164 1000 1165 0 1166 0 1167 0 1168 0 1169 0 1170 0 ….” Rick From: Nicolas Viard [mailto:nicolas.vi...@predict.fr] Sent: Monday, July 30, 2018 5:35 PM To: user@beam.apache.org Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner Hello, I think Spark has a default windowing strategy and pulls data from kafka every X ms. You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000). Best regards, Nicolas ________________________________ De : linr...@itri.org.tw<mailto:linr...@itri.org.tw> <linr...@itri.org.tw<mailto:linr...@itri.org.tw>> Envoyé : lundi 30 juillet 2018 10:58:26 À : user@beam.apache.org<mailto:user@beam.apache.org> Objet : A windows/trigger Question with kafkaIO over Spark Runner Dear all I have a question about the use of windows/triggers. The following versions of related tools are set in my running program: ================================== Beam 2.4.0 (Direct runner and Spark runner) Spark 2.3.1 (local mode) Kafka: 2.11-0.10.1.1 scala: 2.11.8 java: 1.8 ================================== My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO The configuration setting of Kafka broker is: ================================== /kafka_broker/bin/kafka-producer-perf-test.sh \ --num-records 10000000 \ --record-size 100 \ --topic kafkasink \ --throughput 10000 \ --producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000 ================================== The display of Kafka broker on console is as: ================================== ... 49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency. 50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency. 50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. 50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency. 50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency. ... ================================== We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java: ================================== … SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args). withValidation().as(SparkPipelineOptions.class); options.setRunner(SparkRunner.class); Pipeline p = Pipeline.create(options); options.setMaxRecordsPerBatch(1000L); options.setSparkMaster("local[*]"); PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read() .withBootstrapServers("ubuntu7:9092") .withTopic("kafkasink") .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(StringDeserializer.class) //.withMaxNumRecords(500000) .withoutMetadata()); PCollection<KV<Integer, String>> readData1 = readData. apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); … ================================== The processed data will be imported into PostgresSQL. The display of results in DB is shown as follows. 224 3000 225 0 226 3000 227 0 228 0 236 0 237 0 238 5000 Unfortunately, results that we are looking forward to is: 224 9000 225 11000 226 9505 227 9829 228 10001 I do not know how to deal with this situation that maybe is about data latency? 1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632> If any further information is needed, I am glad to be informed and will provide to you as soon as possible. I will highly appreciate it if you can help me to overcome this. I am looking forward to hearing from you. Sincerely yours, Rick -- 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient. -- 本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.