Re: java Flink local test failure (Could not create actor system)
Hi Vijay, Since version 1.7 Flink builds with Scala version 2.11 (default) and 2.12. Flink has APIs, libraries, and runtime modules written in Scala. Users of the Scala API and libraries may have to match the Scala version of Flink with the Scala version of their projects (because Scala is not strictly backward compatible). See [1] for more information. If using maven, artifactId of Flink components usually end with scala version, such as flink-streaming-java_2.11 means it was built against Scala 2.11. [1]. https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/building.html#scala-versions Regards, Smile -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink application has slightly data loss using Processing Time
Hi Rainie, Could you please provide more information about your processing logic? Do you use window operators? If there's no time-based operator in your logic, late arrival data won't be dropped by default and there might be something wrong with your flat map or filter operator. Otherwise, you can use sideOutputLateData() to get the late data of the window and have a look at them. See [1] for more information about sideOutputLateData(). [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output Regards, Smile -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Trigger and completed Checkpointing do not appeared
Hi, Could you please change the source to an endless one? For example a Kafka source or a custom source that implements SourceFunction([1])? env.readTextFile() won't wait for all data to be finished, but exit immediately after telling readers what to read. So it may exit before the first checkpoint being triggered. See [2] for more information. [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html [2]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String- Regards, Smile -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Trigger and completed Checkpointing do not appeared
Hi, After implementing SourceFunction, you can use it to create a DataStream using env.addSource() in your main method. For example, if you have your custom source class with the name CustomSource that implements SourceFunction, then it can be used for getting input data and the if-statement after it can be removed: // get input data DataStream text = env.addSource(new CustomSource()); ExampleCountSource in [1] implements SourceFunction, which can be used to get a DataStream with type Long, not String, such as: DataStream numbers = env.addSource(new ExampleCountSource()); If you only want to have a look at how checkpoint being triggered, see [2] for another sample that has a custom endless source named TransactionSource. When enabled checkpoint it can be triggered with your rules. It might be easier for a beginner than implement it by yourself. However, it may not restore from a checkpoint perfectly since it doesn't implement CheckpointedFunction. That is to say, if you want your source to be restored successfully after failures, CheckpointedFunction is also necessary and ExampleCountSource in [1] is a good example. [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html [2]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html Regards, Smile -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly
Hi all, I'm trying to add mini-batch optimizations for Regular Join (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java) in Blink planner. And there're some test cases that failed, such as AggregateITCase.testGroupBySingleValue. After debugging, I found the corresponding heap memory for BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from [0,0,0,0,0,0,0,0] to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key. However, my mini-batch code doesn't have any low-level operators with MemorySegment. I only buffered some records (RowData) in a Map just like AbstractMapBundleOperator did. Object reuse was also disabled by env.getConfig.disableObjectReuse(). It looks like there's something wrong when StreamOneInputProcessor.processInput changed the memory segments that do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The debugging page with more information was attached. I'm not familiar with org.apache.flink.core.memory.MemorySegment or sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any ideas about why it happens or where to check next? Thank you. Smile <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly
Hi Chesnay Schepler, Thank you for your reply. I found the problem just now. My code will modify the key got from KeySelector by updating its RowKind. Some key selectors such as BinaryRowDataKeySelector returns a copy of a key[1], but EmptyRowDataKeySelector always returns the same object[2]. The test case AggregateITCase.testGroupBySingleValue with SQL Query "SELECT * FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a global join without a key, thus when I perform mykey.setRowKind(RowKind.DELETE), the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records with an empty key got the wrong key. Should EmptyRowDataKeySelector also returns a copy of BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed because it may also be used by other records. Smile [1]. https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49 [2]. https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36 Chesnay Schepler wrote > This is a bit concerning. Could you re-run your test with enabled > assertions and/or modify BinaryRowData#assertIndexIsValid to always > throw an error if one of the 2 assertions is not met? > > On 5/11/2021 9:37 AM, Smile wrote: >> Hi all, >> >> I'm trying to add mini-batch optimizations for Regular Join >> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java) >> in Blink planner. And there're some test cases that failed, such as >> AggregateITCase.testGroupBySingleValue. >> >> After debugging, I found the corresponding heap memory for >> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from >> [0,0,0,0,0,0,0,0] >> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key. >> >> However, my mini-batch code doesn't have any low-level operators with >> MemorySegment. I only buffered some records (RowData) in a Map just like >> AbstractMapBundleOperator did. Object reuse was also disabled by >> env.getConfig.disableObjectReuse(). It looks like there's something wrong >> when StreamOneInputProcessor.processInput changed the memory segments >> that >> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The >> debugging page with more information was attached. >> >> I'm not familiar with org.apache.flink.core.memory.MemorySegment or >> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any >> ideas >> about why it happens or where to check next? >> >> Thank you. >> >> Smile >> >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png>; >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The heartbeat of JobManager timed out
Hi Alexey, We also have the same problem running on Yarn using Flink 1.9.0. JM log shows this: We are also looking for a way to troubleshoot this problem. Best regards. Smile Alexey Trenikhun wrote > Hello, > > I periodically see in JM log (Flink 12.2): > > {"ts":"2021-05-15T21:10:36.325Z","message":"The heartbeat of JobManager > with id be8225ebae1d6422b7f268c801044b05 timed > out.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-5","level":"INFO","level_value":2} > > How to diagnose/troubleshoot this problem? Why could JobManager, which is > co-located with resource manager timeout, I assume this is unlikely > network issue? > > Thanks, > Alexey -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: The heartbeat of JobManager timed out
JM log shows this: INFO org.apache.flink.yarn.YarnResourceManager - The heartbeat of JobManager with id 41e3ef1f248d24ddefdccd1887947106 timed out. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: High DirectByteBuffer Usage
Hi, Are you sure that your growing memory came from DirectByteBuffer? What about metaspace? Flink 1.9 may have some metaspace leak after a full restart or fine-grained restart, see [1] and [2] for more details. And if you didn't set a max metaspace by -XX:MaxMetaspaceSize, it will grow indefinitely and finally cause an OOM kill. [1]. https://issues.apache.org/jira/browse/FLINK-16225 [2]. https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code Regards Smile On 2021/07/15 18:22:56, bat man wrote: > I am not using the Kafka SSL port. > > On Thu, Jul 15, 2021 at 9:48 PM Alexey Trenikhun wrote: > > > Just in case, make sure that you are not using Kafka SSL port without > > setting security protocol, see [1] > > > > [1] https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-4090 > > -- > > *From:* bat man > > *Sent:* Wednesday, July 14, 2021 10:55:54 AM > > *To:* Timo Walther > > *Cc:* user > > *Subject:* Re: High DirectByteBuffer Usage > > > > Hi Timo, > > > > I am looking at these options. > > However, I had a couple of questions - > > 1. The off-heap usage grows overtime. My job does not do any off-heap > > operations so I don't think there is a leak there. Even after GC it keeps > > adding a few MBs after hours of running. > > 2. Secondly, I am seeing as the incoming record volume increases the > > off-heap usage grows. What's the reason for this? > > > > I am using 1.9. Is there any known bug which is causing this issue? > > > > Thanks, > > Hemant > > > > On Wed, Jul 14, 2021 at 7:30 PM Timo Walther wrote: > > > > Hi Hemant, > > > > did you checkout the dedicated page for memory configuration and > > troubleshooting: > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-direct-buffer-memory > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#container-memory-exceeded > > > > It is likely that the high number of output streams could cause your > > issues. > > > > Regards, > > Timo > > > > > > > > > > On 14.07.21 08:46, bat man wrote: > > > Hi, > > > I have a job which reads different streams from 5 kafka topics. It > > > filters data and then data is streamed to different operators for > > > processing. This step involves data shuffling. > > > > > > Also, once data is enriched in 4 joins(KeyedCoProcessFunction) > > > operators. After joining the data is written to different kafka topics. > > > There are a total of 16 different output streams which are written to 4 > > > topics. > > > > > > I have been facing some issues with yarn killing containers. I took the > > > heap dump and ran it through JXray [1]. Heap usage is not high. One > > > thing which stands out is off-heap usage which is very high. My guess is > > > this is what is killing the containers as the data inflow increases. > > > > > > Screenshot 2021-07-14 at 11.52.41 AM.png > > > > > > > > > From the stack above is this usage high because of many output streams > > > being written to kafka topics. As the stack shows RecordWriter holding > > > off this DirectByteBuffer. I have assigned Network Memory as 1GB, and > > > --MaxDirectMemorySize also shows ~1GB for task managers. > > > > > > From here[2] I found that setting -Djdk.nio.maxCachedBufferSize=262144 > > > limits the temp buffer cache. Will it help in this case? > > > jvm version used is - JVM: OpenJDK 64-Bit Server VM - Red Hat, Inc. - > > > 1.8/25.282-b08 > > > > > > [1] - https://jxray.com <https://jxray.com> > > > [2] - > > > > > https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo > > > < > > https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo > > > > > > > > > Thanks, > > > Hemant > > > > >
Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka
Hi, I got an error when tried to compile & package Flink (version 1.12 & current master). It can be reproduced by run 'mvn clean test' under flink-end-to-end-tests/flink-end-to-end-tests-common-kafka. It seems that a necessary dependency for test scope was missing and some classes can not be found. After adding the dependency kafka-avro-serializer to the pom of flink-end-to-end-tests/flink-end-to-end-tests-common-kafka everything goes well. And I just wonder that is this a bug or I missed some local setting? Best regards. Smile Error logs attached: [INFO] < org.apache.flink:flink-end-to-end-tests-common-kafka > [INFO] Building Flink : E2E Tests : Common Kafka 1.13-SNAPSHOT [INFO] [ jar ]- Downloading ... [INFO] [INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @ flink-end-to-end-tests-common-kafka --- [INFO] Deleting /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target [INFO] [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-snakeyaml) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-jackson) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (forbid-log4j-1) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- gmavenplus-plugin:1.8.1:execute (merge-categories) @ flink-end-to-end-tests-common-kafka --- [INFO] Using plugin classloader, includes GMavenPlus classpath. [INFO] Using Groovy 2.5.12 to perform execute. includes: org.apache.flink.tests.util.categories.Dummy excludes: [INFO] [INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @ flink-end-to-end-tests-common-kafka --- [INFO] Highest basedir set to: /Users/smile/Downloads/W/code/flink/apache/master/flink [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ flink-end-to-end-tests-common-kafka --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/resources [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ flink-end-to-end-tests-common-kafka --- [INFO] Compiling 5 source files to /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target/classes [INFO] /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java: /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java uses unchecked or unsafe operations. [INFO] /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java: Recompile with -Xlint:unchecked for details. [INFO] [INFO] --- maven-resources-plugin:3.1.0:testResources (default-testResources) @ flink-end-to-end-tests-common-kafka --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 2 resources [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ flink-end-to-end-tests-common-kafka --- [INFO] Compiling 4 source files to /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target/test-classes [INFO] - [ERROR] COMPILATION ERROR : [INFO] ----- [ERROR] /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java:[113,20] cannot access io.confluent.kafka.serialize
Re:Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka
Yes, I've tried from both the root directory and the sub module. Neither or them works. And the error messages are the same. At 2021-01-21 23:22:12, "Robert Metzger" wrote: Since our CI system is able to build Flink, I believe it's a local issue. Are you sure that the build is failing when you build Flink from the root directory (not calling maven from within a maven module?) On Tue, Jan 19, 2021 at 11:19 AM Smile@LETTers wrote: Hi, I got an error when tried to compile & package Flink (version 1.12 & current master). It can be reproduced by run 'mvn clean test' under flink-end-to-end-tests/flink-end-to-end-tests-common-kafka. It seems that a necessary dependency for test scope was missing and some classes can not be found. After adding the dependency kafka-avro-serializer to the pom of flink-end-to-end-tests/flink-end-to-end-tests-common-kafka everything goes well. And I just wonder that is this a bug or I missed some local setting? Best regards. Smile Error logs attached: [INFO] < org.apache.flink:flink-end-to-end-tests-common-kafka > [INFO] Building Flink : E2E Tests : Common Kafka 1.13-SNAPSHOT [INFO] [ jar ]- Downloading ... [INFO] [INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @ flink-end-to-end-tests-common-kafka --- [INFO] Deleting /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target [INFO] [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-snakeyaml) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-jackson) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (forbid-log4j-1) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- gmavenplus-plugin:1.8.1:execute (merge-categories) @ flink-end-to-end-tests-common-kafka --- [INFO] Using plugin classloader, includes GMavenPlus classpath. [INFO] Using Groovy 2.5.12 to perform execute. includes: org.apache.flink.tests.util.categories.Dummy excludes: [INFO] [INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @ flink-end-to-end-tests-common-kafka --- [INFO] Highest basedir set to: /Users/smile/Downloads/W/code/flink/apache/master/flink [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ flink-end-to-end-tests-common-kafka --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/resources [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ flink-end-to-end-tests-common-kafka --- [INFO] Compiling 5 source files to /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target/classes [INFO] /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java: /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java uses unchecked or unsafe operations. [INFO] /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java: Recompile with -Xlint:unchecked for details. [INFO] [INFO] --- maven-resources-plugin:3.1.0:testResources (default-testResources) @ flink-end-to-end-tests-common-kafka --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 2 resources [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ flink-end-to-end-tests-common-kafka --- [INFO] Compiling 4 source files to /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tes
Re:Re: Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka
Hi Matthias, Sorry for my miss leading. I mean kafka-schema-serializer rather than kafka-avro-serializer. io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe is in kafka-schema-serializer and kafka-schema-serializer should be a dependency of kafka-avro-serializer according to their pom.xml files(see [1], [2]). I couldn't resolve a valid kafka-avro-serializer.jar in my mirror so I downloaded it manually from [3] and installed it using: mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar -Dfile=kafka-avro-serializer-5.5.2.jar After that, I tried to build Flink and got the above exceptions. Then I tried to add the dependency of kafka-schema-serializer to flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml (also manually installed it to my local maven repo) and everything went well. I also tried to remove it from the pom.xml after installing, and the exception came back. Maybe there was something wrong with the manually-installed kafka-avro-serializer? [1]. https://mvnrepository.com/artifact/io.confluent/kafka-schema-serializer/usages [2]. https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/5.5.2/kafka-avro-serializer-5.5.2.pom [3]. https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer At 2021-01-22 21:22:51, "Matthias Pohl" wrote: Hi Smile, Have you used a clean checkout? I second Robert's statement considering that the dependency you're talking about is already part of flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml. It also has the correct scope set both in master and release-1.12. Best, Matthias On Fri, Jan 22, 2021 at 10:04 AM Smile@LETTers wrote: Yes, I've tried from both the root directory and the sub module. Neither or them works. And the error messages are the same. At 2021-01-21 23:22:12, "Robert Metzger" wrote: Since our CI system is able to build Flink, I believe it's a local issue. Are you sure that the build is failing when you build Flink from the root directory (not calling maven from within a maven module?) On Tue, Jan 19, 2021 at 11:19 AM Smile@LETTers wrote: Hi, I got an error when tried to compile & package Flink (version 1.12 & current master). It can be reproduced by run 'mvn clean test' under flink-end-to-end-tests/flink-end-to-end-tests-common-kafka. It seems that a necessary dependency for test scope was missing and some classes can not be found. After adding the dependency kafka-avro-serializer to the pom of flink-end-to-end-tests/flink-end-to-end-tests-common-kafka everything goes well. And I just wonder that is this a bug or I missed some local setting? Best regards. Smile Error logs attached: [INFO] < org.apache.flink:flink-end-to-end-tests-common-kafka > [INFO] Building Flink : E2E Tests : Common Kafka 1.13-SNAPSHOT [INFO] [ jar ]- Downloading ... [INFO] [INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @ flink-end-to-end-tests-common-kafka --- [INFO] Deleting /Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target [INFO] [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-snakeyaml) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-jackson) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (forbid-log4j-1) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- gmavenplus-plugin:1.8.1:execute (merge-categories) @ flink-end-to-end-tests-common-kafka --- [INFO] Using plugin classloader, includes GMavenPlus classpath. [INFO] Using Groovy 2.5.12 to perform execute. includes: org.apache.flink.tests.util.categories.Dummy excludes: [INFO] [INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @ flink-end-to-end-tests-common-kafka --- [INFO] Highest basedir set to: /Users/smile/Downloads/W/code/flink/apache/master/flink [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ flink-end-to-end-tests-common-kafka --- [INFO] [INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ flink-end-to-end-tests-common-kafka ---
Re:Re: Re: Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka
Thanks, Matthias! I tried your suggestion and it does work. After installing kafka-avro-serializer with pom I got some more errors about io.confluent:kafka-schema-registry-parent:pom:5.5.2 and io.confluent:rest-utils-parent:pom:5.5.2 and so on. After manually installing all these dependencies with pom, the build succeeded. --- mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar -Dfile=kafka-avro-serializer-5.5.2.jar -DpomFile=kafka-avro-serializer-5.5.2.pom mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-serializer -Dversion=5.5.2 -Dpackaging=jar -Dfile=kafka-schema-serializer-5.5.2.jar -DpomFile=kafka-schema-serializer-5.5.2.pom mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=5.5.2 -Dpackaging=jar -Dfile=kafka-schema-registry-client-5.5.2.jar -DpomFile=kafka-schema-registry-client-5.5.2.pom mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-parent -Dversion=5.5.2 -Dpackaging=pom -Dfile=kafka-schema-registry-parent-5.5.2.pom -DpomFile=kafka-schema-registry-parent-5.5.2.pom mvn install:install-file -DgroupId=io.confluent -DartifactId=common -Dversion=5.5.2 -Dpackaging=pom -Dfile=common-5.5.2.pom -DpomFile=common-5.5.2.pom mvn install:install-file -DgroupId=io.confluent -DartifactId=common-parent -Dversion=5.5.2 -Dpackaging=pom -Dfile=common-parent-5.5.2.pom -DpomFile=common-parent-5.5.2.pom mvn install:install-file -DgroupId=io.confluent -DartifactId=common-config -Dversion=5.5.2 -Dpackaging=jar -Dfile=common-config-5.5.2.jar -DpomFile=common-config-5.5.2.pom mvn install:install-file -DgroupId=io.confluent -DartifactId=common-utils -Dversion=5.5.2 -Dpackaging=jar -Dfile=common-utils-5.5.2.jar -DpomFile=common-utils-5.5.2.pom At 2021-01-25 16:36:12, "Matthias Pohl" wrote: Hi Smile, you missed installing the pom provided by mvnrepository.org [1]. Maven will install a basic pom if none is provided [2]. This basic pom file will not include any dependencies. You should be able to fix your problem by running your command above but adding the -DpomFile property with the pom file provided in [1]: mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar -Dfile=kafka-avro-serializer-5.5.2.jar -DpomFile=kafka-avro-serializer-5.5.2.pom [1] https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/5.5.2/kafka-avro-serializer-5.5.2.pom [2] https://maven.apache.org/plugins/maven-install-plugin/install-file-mojo.html#pomFile On Mon, Jan 25, 2021 at 8:25 AM Smile@LETTers wrote: Hi Matthias, Sorry for my miss leading. I mean kafka-schema-serializer rather than kafka-avro-serializer. io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe is in kafka-schema-serializer and kafka-schema-serializer should be a dependency of kafka-avro-serializer according to their pom.xml files(see [1], [2]). I couldn't resolve a valid kafka-avro-serializer.jar in my mirror so I downloaded it manually from [3] and installed it using: mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar -Dfile=kafka-avro-serializer-5.5.2.jar After that, I tried to build Flink and got the above exceptions. Then I tried to add the dependency of kafka-schema-serializer to flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml (also manually installed it to my local maven repo) and everything went well. I also tried to remove it from the pom.xml after installing, and the exception came back. Maybe there was something wrong with the manually-installed kafka-avro-serializer? [1]. https://mvnrepository.com/artifact/io.confluent/kafka-schema-serializer/usages [2]. https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/5.5.2/kafka-avro-serializer-5.5.2.pom [3]. https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer At 2021-01-22 21:22:51, "Matthias Pohl" wrote: Hi Smile, Have you used a clean checkout? I second Robert's statement considering that the dependency you're talking about is already part of flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml. It also has the correct scope set both in master and release-1.12. Best, Matthias On Fri, Jan 22, 2021 at 10:04 AM Smile@LETTers wrote: Yes, I've tried from both the root directory and the sub module. Neither or them works. And the error messages are the same. At 2021-01-21 23:22:12, "Robert Metzger" wrote: Since our CI system is able to build Flink, I believe it's a local issue. Are you sure that the build is failing when you build Flink from the root directory (not calling maven from within a maven module?) On Tue, Jan 19, 2021 at 11:19 AM Smile@LETTers wrote: Hi, I got an error when t
Re:Re: Trigger and completed Checkpointing do not appeared
Hi, In short, [1] means whether the job will trigger checkpoints, and [2] means which operators will take action when checkpoints are triggered. If use ExampleCountSource, flink-streaming-java should be a dependency in pom.xml and classes such as ListState, ListStateDescriptor, FunctionInitializationContext, FunctionSnapshotContext, CheckpointedFunction, SourceFunction should be import. By the way, I'm not sure whether this mail will be displayed well because it's the first time for me to write such a formatted one. If not, please let me know. Detailed reply for question 1: CheckpointedFunction is not necessary to trigger or complete a checkpoint. A job will trigger a checkpoint when all its tasks are running and checkpointing was enabled using code in [1], such as env.enableCheckpointing(xxx). Your job in the first mail didn't trigger a checkpoint because the source was not running at the time of the first checkpoint (rather than checkpoint was not enabled). However, for some functions and operators, checkpoints make no sense. Take the code in that word count demo for an example: source → flatMap → keyBy → sum → print Assume the data: aaa bbb aaa bbb ccc aaa bbb aaa ccc ddd And assume the job failed because of somewhat error after dealing with the first 3 lines. aaa bbb aaa bbb ccc aaa -- job fail -- job recover bbb aaa ccc ddd When the source operator and the sum operator recover from a failure, they'll need a checkpoint. The source operator wants to know where to start (the 4th line) because some data may already be done before the failure. The sum operator wants to know what's the count of every word before the failure (aaa:3, bbb:2, ccc:1) so that when new sentences coming they can be calculated correctly. However, the flatMap operator doesn't need a checkpoint at all. Whenever a sentence comes, split it. This operator requires nothing from a checkpoint to recover. CheckpointedFunction in [2] is to distinguish these stateful functions from all the functions (It's not the only way, but the most flexible one). See [3] and [4] for more information. Detailed reply for question 2: Here's my sample code for ExampleCountSource.java | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; publicclassExampleCountSourceimplements SourceFunction, CheckpointedFunction { privatelong count = 0L; privatevolatileboolean isRunning = true; privatetransient ListState checkpointedCount; @Override publicvoid run(SourceContext ctx) throws Exception { while (isRunning && count < 1000) { // this synchronized block ensures that state checkpointing,// internal state updates and emission of elements are an atomic operationsynchronized (ctx.getCheckpointLock()) { ctx.collect(count); count++; } } } @Override publicvoid cancel() { isRunning = false; } @Override publicvoid initializeState(FunctionInitializationContext context) throws Exception { this.checkpointedCount = context .getOperatorStateStore() .getListState(new ListStateDescriptor<>("count", Long.class)); if (context.isRestored()) { for (Long count : this.checkpointedCount.get()) { this.count = count; } } } @Override publicvoid snapshotState(FunctionSnapshotContext context) throws Exception { this.checkpointedCount.clear(); this.checkpointedCount.add(count); } } | [1]. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#java [2]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html [3]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html [4]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html Regards, Smile