Re: java Flink local test failure (Could not create actor system)

2021-02-28 Thread Smile
Hi Vijay,

Since version 1.7 Flink builds with Scala version 2.11 (default) and 2.12.
Flink has APIs, libraries, and runtime modules written in Scala. Users of
the Scala API and libraries may have to match the Scala version of Flink
with the Scala version of their projects (because Scala is not strictly
backward compatible). See [1] for more information.

If using maven, artifactId of Flink components usually end with scala
version, such as flink-streaming-java_2.11 means it was built against Scala
2.11.

[1].
https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/building.html#scala-versions

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink application has slightly data loss using Processing Time

2021-03-08 Thread Smile
Hi Rainie, 

Could you please provide more information about your processing logic?
Do you use window operators?
If there's no time-based operator in your logic, late arrival data won't be
dropped by default and there might be something wrong with your flat map or
filter operator. Otherwise, you can use sideOutputLateData() to get the late
data of the window and have a look at them. See [1] for more information
about sideOutputLateData().

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
 

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Trigger and completed Checkpointing do not appeared

2021-03-08 Thread Smile
Hi,

Could you please change the source to an endless one? For example a Kafka
source or a custom source that implements SourceFunction([1])? 
env.readTextFile() won't wait for all data to be finished, but exit
immediately after telling readers what to read. So it may exit before the
first checkpoint being triggered. See [2] for more information.

[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-

Regards,
Smile



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Trigger and completed Checkpointing do not appeared

2021-03-09 Thread Smile
Hi, 

After implementing SourceFunction, you can use it to create a DataStream
using env.addSource() in your main method.
For example, if you have your custom source class with the name CustomSource
that implements SourceFunction, then it can be used for getting
input data and the if-statement after it can be removed:

// get input data
DataStream text = env.addSource(new CustomSource());


ExampleCountSource in [1] implements SourceFunction, which can be used
to get a DataStream with type Long, not String, such as:

DataStream numbers = env.addSource(new ExampleCountSource());


If you only want to have a look at how checkpoint being triggered, see [2]
for another sample that has a custom endless source named TransactionSource.
When enabled checkpoint it can be triggered with your rules. It might be
easier for a beginner than implement it by yourself.
However, it may not restore from a checkpoint perfectly since it doesn't
implement CheckpointedFunction. That is to say, if you want your source to
be restored successfully after failures, CheckpointedFunction is also
necessary and ExampleCountSource in [1] is a good example.


[1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[2].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html

Regards,
Smile




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly

2021-05-11 Thread Smile
Hi all,

I'm trying to add mini-batch optimizations for Regular Join
(flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)
in Blink planner. And there're some test cases that failed, such as
AggregateITCase.testGroupBySingleValue.

After debugging, I found the corresponding heap memory for
BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from [0,0,0,0,0,0,0,0]
to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key.

However, my mini-batch code doesn't have any low-level operators with
MemorySegment. I only buffered some records (RowData) in a Map just like
AbstractMapBundleOperator did. Object reuse was also disabled by
env.getConfig.disableObjectReuse(). It looks like there's something wrong
when StreamOneInputProcessor.processInput changed the memory segments that
do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The
debugging page with more information was attached.

I'm not familiar with org.apache.flink.core.memory.MemorySegment or
sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any ideas
about why it happens or where to check next?

Thank you.

Smile 

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png>
 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly

2021-05-11 Thread Smile
Hi Chesnay Schepler, 

Thank you for your reply.
I found the problem just now.

My code will modify the key got from KeySelector by updating its RowKind. 
Some key selectors such as BinaryRowDataKeySelector returns a copy of a
key[1], but EmptyRowDataKeySelector always returns the same object[2]. 

The test case AggregateITCase.testGroupBySingleValue with SQL Query "SELECT
* FROM T2 WHERE T2.a < (SELECT count(*) * 0.3 FROM T1)" is indeed a global
join without a key, thus when I perform mykey.setRowKind(RowKind.DELETE),
the object of BinaryRowDataUtil.EMPTY_ROW changed, and all those records
with an empty key got the wrong key.

Should EmptyRowDataKeySelector also returns a copy of
BinaryRowDataUtil.EMPTY_ROW? Otherwise, the key should never be changed
because it may also be used by other records.

Smile

[1].
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/BinaryRowDataKeySelector.java#L49
[2].
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/keyselector/EmptyRowDataKeySelector.java#L36



Chesnay Schepler wrote
> This is a bit concerning. Could you re-run your test with enabled 
> assertions and/or modify BinaryRowData#assertIndexIsValid to always 
> throw an error if one of the 2 assertions is not met?
> 
> On 5/11/2021 9:37 AM, Smile wrote:
>> Hi all,
>>
>> I'm trying to add mini-batch optimizations for Regular Join
>> (flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java)
>> in Blink planner. And there're some test cases that failed, such as
>> AggregateITCase.testGroupBySingleValue.
>>
>> After debugging, I found the corresponding heap memory for
>> BinaryRowDataUtil.EMPTY_ROW was changed unexpectedly, from
>> [0,0,0,0,0,0,0,0]
>> to [3,0,0,0,0,0,0,0], and lead to some records being set to a wrong key.
>>
>> However, my mini-batch code doesn't have any low-level operators with
>> MemorySegment. I only buffered some records (RowData) in a Map just like
>> AbstractMapBundleOperator did. Object reuse was also disabled by
>> env.getConfig.disableObjectReuse(). It looks like there's something wrong
>> when StreamOneInputProcessor.processInput changed the memory segments
>> that
>> do not belong to it (belong to BinaryRowDataUtil.EMPTY_ROW instead). The
>> debugging page with more information was attached.
>>
>> I'm not familiar with org.apache.flink.core.memory.MemorySegment or
>> sun.misc.Unsafe, so I'd like to ask maillist for help. Do you have any
>> ideas
>> about why it happens or where to check next?
>>
>> Thank you.
>>
>> Smile
>>
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2787/EmptyRowDebug-20210511-1.png>;
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The heartbeat of JobManager timed out

2021-05-16 Thread Smile
Hi Alexey,

We also have the same problem running on Yarn using Flink 1.9.0.
JM log shows this:


We are also looking for a way to troubleshoot this problem.

Best regards.
Smile


Alexey Trenikhun wrote
> Hello,
> 
> I periodically see in JM log (Flink 12.2):
> 
> {"ts":"2021-05-15T21:10:36.325Z","message":"The heartbeat of JobManager
> with id be8225ebae1d6422b7f268c801044b05 timed
> out.","logger_name":"org.apache.flink.runtime.resourcemanager.StandaloneResourceManager","thread_name":"flink-akka.actor.default-dispatcher-5","level":"INFO","level_value":2}
> 
> How to diagnose/troubleshoot this problem? Why could JobManager, which is
> co-located with resource manager timeout, I assume this is unlikely
> network issue?
> 
> Thanks,
> Alexey





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: The heartbeat of JobManager timed out

2021-05-16 Thread Smile
JM log shows this:

INFO  org.apache.flink.yarn.YarnResourceManager - The
heartbeat of JobManager with id 41e3ef1f248d24ddefdccd1887947106 timed out.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: High DirectByteBuffer Usage

2021-07-15 Thread Smile
Hi,
Are you sure that your growing memory came from DirectByteBuffer? What about 
metaspace? 
Flink 1.9 may have some metaspace leak after a full restart or fine-grained 
restart, see  [1] and [2] for more details. And if you didn't set a max 
metaspace by -XX:MaxMetaspaceSize, it will grow indefinitely and finally cause 
an OOM kill.

[1]. https://issues.apache.org/jira/browse/FLINK-16225
[2]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/debugging_classloading.html#unloading-of-dynamically-loaded-classes-in-user-code

Regards
Smile

On 2021/07/15 18:22:56, bat man  wrote: 
> I am not using the Kafka SSL port.
> 
> On Thu, Jul 15, 2021 at 9:48 PM Alexey Trenikhun  wrote:
> 
> > Just in case, make sure that you are not using Kafka SSL port without
> > setting security protocol, see [1]
> >
> > [1] https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-4090
> > --
> > *From:* bat man 
> > *Sent:* Wednesday, July 14, 2021 10:55:54 AM
> > *To:* Timo Walther 
> > *Cc:* user 
> > *Subject:* Re: High DirectByteBuffer Usage
> >
> > Hi Timo,
> >
> > I am looking at these options.
> > However, I had a couple of questions -
> > 1. The off-heap usage grows overtime. My job does not do any off-heap
> > operations so I don't think there is a leak there. Even after GC it keeps
> > adding a few MBs after hours of running.
> > 2. Secondly, I am seeing as the incoming record volume increases the
> > off-heap usage grows. What's the reason for this?
> >
> > I am using 1.9. Is there any known bug which is causing this issue?
> >
> > Thanks,
> > Hemant
> >
> > On Wed, Jul 14, 2021 at 7:30 PM Timo Walther  wrote:
> >
> > Hi Hemant,
> >
> > did you checkout the dedicated page for memory configuration and
> > troubleshooting:
> >
> >
> > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#outofmemoryerror-direct-buffer-memory
> >
> >
> > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_trouble/#container-memory-exceeded
> >
> > It is likely that the high number of output streams could cause your
> > issues.
> >
> > Regards,
> > Timo
> >
> >
> >
> >
> > On 14.07.21 08:46, bat man wrote:
> > > Hi,
> > > I have a job which reads different streams from 5 kafka topics. It
> > > filters data and then data is streamed to different operators for
> > > processing. This step involves data shuffling.
> > >
> > > Also, once data is enriched in 4 joins(KeyedCoProcessFunction)
> > > operators. After joining the data is written to different kafka topics.
> > > There are a total of 16 different output streams which are written to 4
> > > topics.
> > >
> > > I have been facing some issues with yarn killing containers. I took the
> > > heap dump and ran it through JXray [1]. Heap usage is not high. One
> > > thing which stands out is off-heap usage which is very high. My guess is
> > > this is what is killing the containers as the data inflow increases.
> > >
> > > Screenshot 2021-07-14 at 11.52.41 AM.png
> > >
> > >
> > >  From the stack above is this usage high because of many output streams
> > > being written to kafka topics. As the stack shows RecordWriter holding
> > > off this DirectByteBuffer. I have assigned Network Memory as 1GB, and
> > > --MaxDirectMemorySize also shows ~1GB for task managers.
> > >
> > >  From here[2] I found that setting -Djdk.nio.maxCachedBufferSize=262144
> > > limits the temp buffer cache. Will it help in this case?
> > > jvm version used is - JVM: OpenJDK 64-Bit Server VM - Red Hat, Inc. -
> > > 1.8/25.282-b08
> > >
> > > [1] - https://jxray.com <https://jxray.com>
> > > [2] -
> > >
> > https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo
> > > <
> > https://dzone.com/articles/troubleshooting-problems-with-native-off-heap-memo
> > >
> > >
> > > Thanks,
> > > Hemant
> >
> >
> 


Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka

2021-01-19 Thread Smile@LETTers
Hi, 
I got an error when tried to compile & package Flink (version 1.12 & current 
master).
It can be reproduced by run 'mvn clean test' under 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka.


It seems that a necessary dependency for test scope was missing and some 
classes can not be found.
After adding the dependency kafka-avro-serializer to the pom of 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka everything goes well.


And I just wonder that is this a bug or I missed some local setting?


Best regards.
Smile


Error logs attached:




[INFO] < org.apache.flink:flink-end-to-end-tests-common-kafka >
[INFO] Building Flink : E2E Tests : Common Kafka 1.13-SNAPSHOT
[INFO] [ jar ]-
Downloading ...
[INFO]
[INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Deleting 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target
[INFO]
[INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-snakeyaml) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-jackson) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (forbid-log4j-1) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- gmavenplus-plugin:1.8.1:execute (merge-categories) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Using plugin classloader, includes GMavenPlus classpath.
[INFO] Using Groovy 2.5.12 to perform execute.
includes: org.apache.flink.tests.util.categories.Dummy
excludes:
[INFO]
[INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Highest basedir set to: 
/Users/smile/Downloads/W/code/flink/apache/master/flink
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) 
@ flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Compiling 5 source files to 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target/classes
[INFO] 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java:
 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
 uses unchecked or unsafe operations.
[INFO] 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java:
 Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-resources-plugin:3.1.0:testResources (default-testResources) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 2 resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Compiling 4 source files to 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target/test-classes
[INFO] -
[ERROR] COMPILATION ERROR :
[INFO] -----
[ERROR] 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java:[113,20]
 cannot access io.confluent.kafka.serialize

Re:Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka

2021-01-22 Thread Smile@LETTers
Yes, I've tried from both the root directory and the sub module. Neither or 
them works. And the error messages are the same.



At 2021-01-21 23:22:12, "Robert Metzger"  wrote:

Since our CI system is able to build Flink, I believe it's a local issue.


Are you sure that the build is failing when you build Flink from the root 
directory (not calling maven from within a maven module?)


On Tue, Jan 19, 2021 at 11:19 AM Smile@LETTers  wrote:

Hi, 
I got an error when tried to compile & package Flink (version 1.12 & current 
master).
It can be reproduced by run 'mvn clean test' under 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka.


It seems that a necessary dependency for test scope was missing and some 
classes can not be found.
After adding the dependency kafka-avro-serializer to the pom of 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka everything goes well.


And I just wonder that is this a bug or I missed some local setting?


Best regards.
Smile


Error logs attached:




[INFO] < org.apache.flink:flink-end-to-end-tests-common-kafka >
[INFO] Building Flink : E2E Tests : Common Kafka 1.13-SNAPSHOT
[INFO] [ jar ]-
Downloading ...
[INFO]
[INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Deleting 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target
[INFO]
[INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-snakeyaml) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-jackson) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (forbid-log4j-1) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- gmavenplus-plugin:1.8.1:execute (merge-categories) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Using plugin classloader, includes GMavenPlus classpath.
[INFO] Using Groovy 2.5.12 to perform execute.
includes: org.apache.flink.tests.util.categories.Dummy
excludes:
[INFO]
[INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Highest basedir set to: 
/Users/smile/Downloads/W/code/flink/apache/master/flink
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) 
@ flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Compiling 5 source files to 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target/classes
[INFO] 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java:
 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
 uses unchecked or unsafe operations.
[INFO] 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java:
 Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-resources-plugin:3.1.0:testResources (default-testResources) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 2 resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Compiling 4 source files to 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tes

Re:Re: Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka

2021-01-24 Thread Smile@LETTers
Hi Matthias,
Sorry for my miss leading. I mean kafka-schema-serializer rather than 
kafka-avro-serializer.

io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe is in 
kafka-schema-serializer and kafka-schema-serializer should be a dependency of 
kafka-avro-serializer according to their pom.xml files(see [1], [2]).
I couldn't resolve a valid kafka-avro-serializer.jar in my mirror so I 
downloaded it manually from [3] and installed it using:
mvn install:install-file -DgroupId=io.confluent 
-DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar 
-Dfile=kafka-avro-serializer-5.5.2.jar
After that, I tried to build Flink and got the above exceptions. Then I tried 
to add the dependency of kafka-schema-serializer to 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml (also 
manually installed it to my local maven repo) and everything went well.
I also tried to remove it from the pom.xml after installing, and the exception 
came back.
Maybe there was something wrong with the manually-installed 
kafka-avro-serializer?


[1]. 
https://mvnrepository.com/artifact/io.confluent/kafka-schema-serializer/usages
[2]. 
https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/5.5.2/kafka-avro-serializer-5.5.2.pom
[3]. https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer



At 2021-01-22 21:22:51, "Matthias Pohl"  wrote:

Hi Smile,
Have you used a clean checkout? I second Robert's statement considering that 
the dependency you're talking about is already part of 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml. It also has 
the correct scope set both in master and release-1.12.


Best,
Matthias


On Fri, Jan 22, 2021 at 10:04 AM Smile@LETTers  wrote:

Yes, I've tried from both the root directory and the sub module. Neither or 
them works. And the error messages are the same.



At 2021-01-21 23:22:12, "Robert Metzger"  wrote:

Since our CI system is able to build Flink, I believe it's a local issue.


Are you sure that the build is failing when you build Flink from the root 
directory (not calling maven from within a maven module?)


On Tue, Jan 19, 2021 at 11:19 AM Smile@LETTers  wrote:

Hi, 
I got an error when tried to compile & package Flink (version 1.12 & current 
master).
It can be reproduced by run 'mvn clean test' under 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka.


It seems that a necessary dependency for test scope was missing and some 
classes can not be found.
After adding the dependency kafka-avro-serializer to the pom of 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka everything goes well.


And I just wonder that is this a bug or I missed some local setting?


Best regards.
Smile


Error logs attached:




[INFO] < org.apache.flink:flink-end-to-end-tests-common-kafka >
[INFO] Building Flink : E2E Tests : Common Kafka 1.13-SNAPSHOT
[INFO] [ jar ]-
Downloading ...
[INFO]
[INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Deleting 
/Users/smile/Downloads/W/code/flink/apache/master/flink/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/target
[INFO]
[INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-snakeyaml) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (ban-unsafe-jackson) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (forbid-log4j-1) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ 
flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- gmavenplus-plugin:1.8.1:execute (merge-categories) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Using plugin classloader, includes GMavenPlus classpath.
[INFO] Using Groovy 2.5.12 to perform execute.
includes: org.apache.flink.tests.util.categories.Dummy
excludes:
[INFO]
[INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @ 
flink-end-to-end-tests-common-kafka ---
[INFO] Highest basedir set to: 
/Users/smile/Downloads/W/code/flink/apache/master/flink
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) 
@ flink-end-to-end-tests-common-kafka ---
[INFO]
[INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ 
flink-end-to-end-tests-common-kafka ---

Re:Re: Re: Re: Test failed in flink-end-to-end-tests/flink-end-to-end-tests-common-kafka

2021-01-25 Thread Smile@LETTers
Thanks, Matthias!
I tried your suggestion and it does work.
After installing kafka-avro-serializer with pom I got some more errors about 
io.confluent:kafka-schema-registry-parent:pom:5.5.2 and 
io.confluent:rest-utils-parent:pom:5.5.2 and so on. After manually installing 
all these dependencies with pom, the build succeeded.

---
mvn install:install-file -DgroupId=io.confluent 
-DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar 
-Dfile=kafka-avro-serializer-5.5.2.jar -DpomFile=kafka-avro-serializer-5.5.2.pom
mvn install:install-file -DgroupId=io.confluent 
-DartifactId=kafka-schema-serializer -Dversion=5.5.2 -Dpackaging=jar 
-Dfile=kafka-schema-serializer-5.5.2.jar 
-DpomFile=kafka-schema-serializer-5.5.2.pom
mvn install:install-file -DgroupId=io.confluent 
-DartifactId=kafka-schema-registry-client -Dversion=5.5.2 -Dpackaging=jar 
-Dfile=kafka-schema-registry-client-5.5.2.jar 
-DpomFile=kafka-schema-registry-client-5.5.2.pom
mvn install:install-file -DgroupId=io.confluent 
-DartifactId=kafka-schema-registry-parent -Dversion=5.5.2 -Dpackaging=pom 
-Dfile=kafka-schema-registry-parent-5.5.2.pom 
-DpomFile=kafka-schema-registry-parent-5.5.2.pom
mvn install:install-file -DgroupId=io.confluent -DartifactId=common 
-Dversion=5.5.2 -Dpackaging=pom -Dfile=common-5.5.2.pom 
-DpomFile=common-5.5.2.pom
mvn install:install-file -DgroupId=io.confluent -DartifactId=common-parent 
-Dversion=5.5.2 -Dpackaging=pom -Dfile=common-parent-5.5.2.pom 
-DpomFile=common-parent-5.5.2.pom
mvn install:install-file -DgroupId=io.confluent -DartifactId=common-config 
-Dversion=5.5.2 -Dpackaging=jar -Dfile=common-config-5.5.2.jar 
-DpomFile=common-config-5.5.2.pom
mvn install:install-file -DgroupId=io.confluent -DartifactId=common-utils 
-Dversion=5.5.2 -Dpackaging=jar -Dfile=common-utils-5.5.2.jar 
-DpomFile=common-utils-5.5.2.pom



At 2021-01-25 16:36:12, "Matthias Pohl"  wrote:

Hi Smile,
you missed installing the pom provided by mvnrepository.org [1]. Maven will 
install a basic pom if none is provided [2]. This basic pom file will not 
include any dependencies. You should be able to fix your problem by running 
your command above but adding the -DpomFile property with the pom file provided 
in [1]:


mvn install:install-file -DgroupId=io.confluent 
-DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar 
-Dfile=kafka-avro-serializer-5.5.2.jar -DpomFile=kafka-avro-serializer-5.5.2.pom


[1] 
https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/5.5.2/kafka-avro-serializer-5.5.2.pom
[2] 
https://maven.apache.org/plugins/maven-install-plugin/install-file-mojo.html#pomFile


On Mon, Jan 25, 2021 at 8:25 AM Smile@LETTers  wrote:

Hi Matthias,
Sorry for my miss leading. I mean kafka-schema-serializer rather than 
kafka-avro-serializer.

io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe is in 
kafka-schema-serializer and kafka-schema-serializer should be a dependency of 
kafka-avro-serializer according to their pom.xml files(see [1], [2]).
I couldn't resolve a valid kafka-avro-serializer.jar in my mirror so I 
downloaded it manually from [3] and installed it using:
mvn install:install-file -DgroupId=io.confluent 
-DartifactId=kafka-avro-serializer -Dversion=5.5.2 -Dpackaging=jar 
-Dfile=kafka-avro-serializer-5.5.2.jar
After that, I tried to build Flink and got the above exceptions. Then I tried 
to add the dependency of kafka-schema-serializer to 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml (also 
manually installed it to my local maven repo) and everything went well.
I also tried to remove it from the pom.xml after installing, and the exception 
came back.
Maybe there was something wrong with the manually-installed 
kafka-avro-serializer?


[1]. 
https://mvnrepository.com/artifact/io.confluent/kafka-schema-serializer/usages
[2]. 
https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/5.5.2/kafka-avro-serializer-5.5.2.pom
[3]. https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer



At 2021-01-22 21:22:51, "Matthias Pohl"  wrote:

Hi Smile,
Have you used a clean checkout? I second Robert's statement considering that 
the dependency you're talking about is already part of 
flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml. It also has 
the correct scope set both in master and release-1.12.


Best,
Matthias


On Fri, Jan 22, 2021 at 10:04 AM Smile@LETTers  wrote:

Yes, I've tried from both the root directory and the sub module. Neither or 
them works. And the error messages are the same.



At 2021-01-21 23:22:12, "Robert Metzger"  wrote:

Since our CI system is able to build Flink, I believe it's a local issue.


Are you sure that the build is failing when you build Flink from the root 
directory (not calling maven from within a maven module?)


On Tue, Jan 19, 2021 at 11:19 AM Smile@LETTers  wrote:

Hi, 
I got an error when t

Re:Re: Trigger and completed Checkpointing do not appeared

2021-03-11 Thread Smile@LETTers
Hi,


In short, [1] means whether the job will trigger checkpoints, and [2] means 
which operators will take action when checkpoints are triggered.
If use ExampleCountSource, flink-streaming-java should be a dependency in 
pom.xml and classes such as ListState, ListStateDescriptor, 
FunctionInitializationContext, FunctionSnapshotContext, CheckpointedFunction, 
SourceFunction should be import.


By the way, I'm not sure whether this mail will be displayed well because it's 
the first time for me to write such a formatted one. If not, please let me know.





Detailed reply for question 1:


CheckpointedFunction is not necessary to trigger or complete a checkpoint. 


A job will trigger a checkpoint when all its tasks are running and 
checkpointing was enabled using code in [1], such as 
env.enableCheckpointing(xxx). Your job in the first mail didn't trigger a 
checkpoint because the source was not running at the time of the first 
checkpoint (rather than checkpoint was not enabled).


However, for some functions and operators, checkpoints make no sense. 


Take the code in that word count demo for an example:


source → flatMap → keyBy → sum → print


Assume the data:


aaa bbb aaa
bbb ccc
aaa
bbb
aaa ccc ddd


And assume the job failed because of somewhat error after dealing with the 
first 3 lines.


aaa bbb aaa
bbb ccc
aaa
-- job fail
-- job recover
bbb
aaa ccc ddd


When the source operator and the sum operator recover from a failure, they'll 
need a checkpoint.
The source operator wants to know where to start (the 4th line) because some 
data may already be done before the failure. The sum operator wants to know 
what's the count of every word before the failure (aaa:3, bbb:2, ccc:1) so that 
when new sentences coming they can be calculated correctly.


However, the flatMap operator doesn't need a checkpoint at all. Whenever a 
sentence comes, split it. This operator requires nothing from a checkpoint to 
recover. 


CheckpointedFunction in [2] is to distinguish these stateful functions from all 
the functions (It's not the only way, but the most flexible one). See [3] and 
[4] for more information.



Detailed reply for question 2:


Here's my sample code for ExampleCountSource.java


|
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

publicclassExampleCountSourceimplements SourceFunction, 
CheckpointedFunction {
privatelong count = 0L;
privatevolatileboolean isRunning = true;

privatetransient ListState checkpointedCount;

@Override
publicvoid run(SourceContext ctx) throws Exception {
while (isRunning && count < 1000) {
// this synchronized block ensures that state checkpointing,// 
internal state updates and emission of elements are an atomic 
operationsynchronized (ctx.getCheckpointLock()) {
ctx.collect(count);
count++;
}
}
}

@Override
publicvoid cancel() {
isRunning = false;
}

@Override
publicvoid initializeState(FunctionInitializationContext context) throws 
Exception {
this.checkpointedCount = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("count", Long.class));

if (context.isRestored()) {
for (Long count : this.checkpointedCount.get()) {
this.count = count;
}
}
}

@Override
publicvoid snapshotState(FunctionSnapshotContext context) throws Exception {
this.checkpointedCount.clear();
this.checkpointedCount.add(count);
}
}

|



[1]. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#java
[2]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[3]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html
[4]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html


Regards,
Smile