getting in an infinite loop while creating the dependency-reduced pom

2020-08-03 Thread Dongwon Kim
Hi,

I create a new maven project (I'm using Maven 3.6.3) w/ the below command

> curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash -s 1.11.1
>
> and add the following dependencies to dependencies

> 
>   org.apache.flink
>   flink-connector-kafka_${scala.binary.version}
>   ${flink.version}
> 
>
> 
>   org.apache.flink
>   flink-test-utils_${scala.binary.version}
>   ${flink.version}
>   test
> 
> 
>   org.apache.flink
>   flink-runtime_${scala.binary.version}
>   ${flink.version}
>   test
>   tests
> 
> 
>   org.apache.flink
>   flink-streaming-java_${scala.binary.version}
>   ${flink.version}
>   test
>   tests
> 
>
>
When executing "mvn clean package", I've got stuck in

> [INFO] Scanning for projects...
> [INFO]
> [INFO] --< org.myorg.quickstart:quickstart 
> >---
> [INFO] Building Flink Quickstart Job 0.1
> [INFO] [ jar 
> ]-
> [INFO]
> [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ quickstart ---
> [INFO]
> [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ 
> quickstart ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO]
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ quickstart 
> ---
> [INFO] Changes detected - recompiling the module!
> [INFO] Compiling 2 source files to 
> /Users/east.12/tmp/quickstart/target/classes
> [INFO]
> [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ 
> quickstart ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] skip non existing resourceDirectory 
> /Users/east.12/tmp/quickstart/src/test/resources
> [INFO]
> [INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ 
> quickstart ---
> [INFO] No sources to compile
> [INFO]
> [INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ quickstart ---
> [INFO] No tests to run.
> [INFO]
> [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ quickstart ---
> [INFO] Building jar: /Users/east.12/tmp/quickstart/target/quickstart-0.1.jar
> [INFO]
> [INFO] --- maven-shade-plugin:3.1.1:shade (default) @ quickstart ---
> [INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
> [INFO] Excluding org.apache.flink:force-shading:jar:1.11.1 from the shaded 
> jar.
> [INFO] Excluding org.apache.logging.log4j:log4j-slf4j-impl:jar:2.12.1 from 
> the shaded jar.
> [INFO] Excluding org.apache.logging.log4j:log4j-api:jar:2.12.1 from the 
> shaded jar.
> [INFO] Excluding org.apache.logging.log4j:log4j-core:jar:2.12.1 from the 
> shaded jar.
> [INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.11.1 in 
> the shaded jar.
> [INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.11.1 
> in the shaded jar.
> [INFO] Including org.apache.kafka:kafka-clients:jar:2.4.1 in the shaded jar.
> [INFO] Including com.github.luben:zstd-jni:jar:1.4.3-1 in the shaded jar.
> [INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
> [INFO] Including org.lz4:lz4-java:jar:1.6.0 in the shaded jar.
> [INFO] Replacing original artifact with shaded artifact.
> [INFO] Replacing /Users/east.12/tmp/quickstart/target/quickstart-0.1.jar with 
> /Users/east.12/tmp/quickstart/target/quickstart-0.1-shaded.jar
> [INFO] Dependency-reduced POM written at: 
> /Users/east.12/tmp/quickstart/dependency-reduced-pom.xml
>
> Any help?

Thanks,

Dongwon


Re: StreamingFileSink: any risk parallelizing active buckets checkpointing?

2020-08-03 Thread Till Rohrmann
I've responded on the dev ML. Let's continue the discussion there:
https://lists.apache.org/thread.html/r251e395c759193d9c75f97b8bfc4917772219ea48bb1848ccc23d26e%40%3Cdev.flink.apache.org%3E

Cheers,
Till

On Thu, Jul 30, 2020 at 8:57 PM Paul Bernier  wrote:

> Hi experts,
>
>
>
> I am trying to use S3 StreamingFileSink with a high number of active
> buckets (>1000). I found that checkpointing duration will grow linearly
> with the number of active buckets, which makes achieving high number of
> active buckets difficult. One reason for that is the each active buckets
> are snapshotted sequentially in a loop
> .
> Given that operation involves waiting for some data to finish being
> uploaded to S3 that can become quite a long wait.
>
>
>
> My question is: could this loop be safely multi-threaded?
>
> Each Bucket seems independent (they do share the bucketWriter though). I
> have also done some basic prototyping and validation and it looks ok. So I
> wondering if I am overlooking anything and if my approach is viable?
>
>
>
> Note: the same approach would also need to be applied to the
> onSuccessfulCompletionOfCheckpoint step with this while loop committing
> files to S3
> 
> .
>
>
>
> Thank you.
>
>
>
> Paul
>


Re: getting in an infinite loop while creating the dependency-reduced pom

2020-08-03 Thread Chesnay Schepler

https://issues.apache.org/jira/browse/MSHADE-148

On 03/08/2020 10:16, Dongwon Kim wrote:

Hi,

I create a new maven project (I'm using Maven 3.6.3) w/ the below command

|curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash -s
1.11.1|

and add the following dependencies to dependencies


org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}



org.apache.flink
flink-test-utils_${scala.binary.version}
${flink.version}
test


org.apache.flink
flink-runtime_${scala.binary.version}
${flink.version}
test
tests


org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
test
tests



When executing "mvn clean package", I've got stuck in

[INFO] Scanning for projects...
[INFO]
[INFO] --< org.myorg.quickstart:quickstart 
>---
[INFO] Building Flink Quickstart Job 0.1
[INFO] [ jar 
]-
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ quickstart ---
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ 
quickstart ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ quickstart 
---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 2 source files to 
/Users/east.12/tmp/quickstart/target/classes
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) 
@ quickstart ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory 
/Users/east.12/tmp/quickstart/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ 
quickstart ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ quickstart ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ quickstart ---
[INFO] Building jar: /Users/east.12/tmp/quickstart/target/quickstart-0.1.jar
[INFO]
[INFO] --- maven-shade-plugin:3.1.1:shade (default) @ quickstart ---
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.11.1 from the shaded 
jar.
[INFO] Excluding org.apache.logging.log4j:log4j-slf4j-impl:jar:2.12.1 from 
the shaded jar.
[INFO] Excluding org.apache.logging.log4j:log4j-api:jar:2.12.1 from the 
shaded jar.
[INFO] Excluding org.apache.logging.log4j:log4j-core:jar:2.12.1 from the 
shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.11.1 in 
the shaded jar.
[INFO] Including 
org.apache.flink:flink-connector-kafka-base_2.11:jar:1.11.1 in the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.4.1 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.4.3-1 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.6.0 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /Users/east.12/tmp/quickstart/target/quickstart-0.1.jar 
with /Users/east.12/tmp/quickstart/target/quickstart-0.1-shaded.jar
[INFO] Dependency-reduced POM written at: 
/Users/east.12/tmp/quickstart/dependency-reduced-pom.xml

Any help?

Thanks,

Dongwon





Re: is it possible one task manager stuck and still fetching data from Kinesis?

2020-08-03 Thread Till Rohrmann
Hi Terry,

I am not a Kinesis expert that's why I've pulled in Thomas and Max who
might know more about Flink's Kinesis behaviour. What could help, though,
would be access to the Flink cluster logs to see whether something fishy is
going on.

Cheers,
Till

On Fri, Jul 31, 2020 at 4:41 AM Terry Chia-Wei Wu  wrote:

> We are running Flink 1.10 about 900+ task managers with kinesis as an
> input stream. The problem we are having now is that only Max Age of
> kinesis shard is growing and the average age of that kinesis is very low
> meaning most of shards having very low age. We already checked the data
> skew issue but it's quite uniformly distributed. Any idea how this can
> happen and how to debug on this issue? I'm wondering is it possible to have
> one TM's operator stuck and source still fetching data so that
> Kinesis's age still going high.
>
> Terry
>
>
>


Re: JDBCOutputFormat dependency loading error

2020-08-03 Thread Till Rohrmann
Hi Flavio,

I am not a JDBC expert but it looks as if you try to
load com.mysql.cj.jdbc.Driver which is not contained
in mariadb-java-client-2.6.0.jar. mariadb-java-client-2.6.0.jar only
contains org/mariadb/jdbc/Driver.class. com.mysql.cj.jdbc.Driver can be
found in mysql-connector-java.jar, though.

Hence I believe that you are missing some dependencies in your user jar to
make your job run. Please check from where com.mysql.cj.jdbc.Driver is
being loaded when running the job from the IDE.

Cheers,
Till

On Fri, Jul 31, 2020 at 4:55 PM Flavio Pompermaier 
wrote:

> Hi to all,
> I'm trying to run my DataSet job on Flink 1.11.0 and I'm connecting toward
> Mariadb in my code.
> I've put the mariadb-java-client-2.6.0.jar in the lib directory and in the
> pom.xml I set that dependency as provided. The code runs successfully from
> the Ide but when I try to run the code on the cluster I get the following
> error:
>
> Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
> at java.net.URLClassLoader.findClass(URLClassLoader.java:471) ~[?:?]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
> at java.lang.Class.forName0(Native Method) ~[?:?]
> at java.lang.Class.forName(Class.java:315) ~[?:?]
> at
> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
> ~myApp.jar:?]
> at
> org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
> ~myApp.jar:?]
> at
> org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
> ~myApp.jar:?]
> at
> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:82)
> ~myApp.jar:?]
> at
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at java.lang.Thread.run(Thread.java:834) ~[?:?]
>
> What should I do?
>
> Thanks in advance,
> Flavio
>


Re: Is there a way to get file "metadata" as part of stream?

2020-08-03 Thread Till Rohrmann
Hi John,

out of the box, Flink does not provide this functionality. However, you
might be able to write your own CsvInputFormat which overrides fillRecord
so that it generates a CSV record where the first field contains the
filename. You can obtain the filename from the field currentSplit. I
haven't tried it out myself, though.

Cheers,
Till

On Fri, Jul 31, 2020 at 5:54 PM John Smith  wrote:

> Hi, so reading a CSV file using env.readFile() with RowCsvInputFormat.
>
> Is there a way to get the filename as part of the row stream?
>
> The file contains a unique identifier to tag the rows with.
>


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-03 Thread Till Rohrmann
Hi Eleanore,

how are you deploying Flink exactly? Are you using the application mode
with native K8s support to deploy a cluster [1] or are you manually
deploying a per-job mode [2]?

I believe the problem might be that we terminate the Flink process with a
non-zero exit code if the job reaches the ApplicationStatus.FAILED [3].

cc Yang Wang have you observed a similar behavior when running Flink in
per-job mode on K8s?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
[3]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32

On Fri, Jul 31, 2020 at 6:26 PM Eleanore Jin  wrote:

> Hi Experts,
>
> I have a flink cluster (per job mode) running on kubernetes. The job is
> configured with restart strategy
>
> restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 
> 10 s
>
>
> So after 3 times retry, the job will be marked as FAILED, hence the pods
> are not running. However, kubernetes will then restart the job again as the
> available replicas do not match the desired one.
>
> I wonder what are the suggestions for such a scenario? How should I
> configure the flink job running on k8s?
>
> Thanks a lot!
> Eleanore
>


Re: Kafka source, committing and retries

2020-08-03 Thread Till Rohrmann
Hi Jack,

I do not fully understand what you want to achieve here. Could you please
give me a bit more context? Are you asking how Flink realizes exactly once
processing guarantees with its connectors?

Cheers,
Till

On Fri, Jul 31, 2020 at 8:56 PM Jack Phelan 
wrote:

> Scenario
> ===
>
> A partition that Flink is reading:
> [ 1 - 2 - 3 - 4 - 5 - 6 - 7 - |  8 _ 9 _ 10 _ 11 | 12 ~ 13 ]
> [.   Committed.   | In flight  | unread  ]
>
> Kafka basically breaks off pieces of the end of the queue and shoves them
> downstream for processing?
>
> So suppose while semantically:
> - 8 &10 succeed (api call success)
> - 9 & 11 fail (api failure).
>
> Failure Handling options
> ==
>
> Basically we have two options to handle failures?
>
> A. Try/catch to deadletter queue
> ```
> try {
> api.write(8, 9, 10, 11);
> } catch E {
> // 9, 11 failed to write to the api so we deadletter them
>
> deadletterQueue.write(E.failed_set())
> }
> ```
>
> B. Or it can fail - which will retry the batch?
> ```
> api.write(8, 9, 10, 11);
> // 9, 11 failed to write to the api
> ```
>
> In situation (B.), we're rewriting 8 and 10 to the api, which is bad, so
> situation (A.) seems better.
>
>
> Challenge I can't understand
> ==
>
> However in (A.) we then do something with the queue:
>
> A2. Try/catch to another deadletter queue?
> ```
> try {
> api.write(9, 11);
> } catch E {
> //11 failed to write to the api
> deadletterQueue2.write(E.failed_set())
> }
> ```
>
> Do you see what I mean? Is it turtles all the way down?
>
> Should I create a separate index of semantic outcome? Where should it live?
>
> Should I just keep things in the queue until
>


Re: Error with Flink-Gelly, lastJobExecutionResult is null for ExecutionEnvironment

2020-08-03 Thread Till Rohrmann
The discussion is held on the dev mailing list:
https://lists.apache.org/thread.html/rfed34be1651aa15fbdd0b3cac0d5607fa34886c1cf1aacf06684e995%40%3Cdev.flink.apache.org%3E
.

Cheers,
Till

On Sat, Aug 1, 2020 at 8:16 AM Xia Rui  wrote:

> Hello, everyone.
>
> I am trying to use Flink-Gelly. The version of Flink I used is 1.11 (I
> also tried 1.12, and it does not work as well).
>
>
>
> Following the instruction in
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/gelly/
> .
>
> First, I build my Flink code with:
>
> (1)   git clone https://github.com/apache/flink.git
>
> (2)   mvn package –DskipTests
>
> Then, I copy some Gelly lib to Flink’s lib directory
>
> (3)   cp opt/flink-gelly_2.11-1.11-SNAPSHOT.jar lib/
>
> Next, I start the cluster with standalone mode
>
> (4)   ./bin/start-cluster.sh
>
> By the way, all the Flink configuration in “flink-conf.yaml” is set as
> default.
>
> Finally, I run the code from Gelly, and get Error.
>
> (5)   ./bin/flink run
> examples/gelly/flink-gelly-examples_2.11-1.11-SNAPSHOT.jar \
>
> --algorithm GraphMetrics --order directed \
>
> --input RMatGraph --type integer --scale 20 --simplify directed \
>
> --output print
>
> This is the error message:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: No result found for job, was execute() called
> before getting the result?
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
> Caused by: java.lang.NullPointerException: No result found for job, was
> execute() called before getting the result?
>
> at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
>
> at
> org.apache.flink.graph.AnalyticHelper.getAccumulator(AnalyticHelper.java:81)
>
> at
> org.apache.flink.graph.library.metric.directed.VertexMetrics.getResult(VertexMetrics.java:116)
>
> at
> org.apache.flink.graph.library.metric.directed.VertexMetrics.getResult(VertexMetrics.java:56)
>
> at
> org.apache.flink.graph.drivers.GraphMetrics.printAnalytics(GraphMetrics.java:113)
>
> at org.apache.flink.graph.Runner.execute(Runner.java:458)
>
> at org.apache.flink.graph.Runner.main(Runner.java:507)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>
> ... 8 more
>
>
>
> I have checked the log of jobmanager and taskmanager, and no error is
> reported.
>
>
>
> The error seems to imply that the ‘lastJobExecutionResult’ in
> ‘ExecutionEnvironment’ is null. But I have already submit the job, and it
> is running.
>
>
>
> Could you share me some idea about my problem?
>
>
>
> Thank you very much.
>
> Rui Xia
>


Re: Does Flink automatically apply any backpressure ?

2020-08-03 Thread Danny Chan
Yes, just like Jake said, the back pressure happened automatically and usually 
there is no need to tweak it, you actually can have configure the metrics about 
it, see [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html

Best,
Danny Chan
在 2020年7月31日 +0800 AM10:28,Jake ,写道:
>
> Hi Suraj Puvvada
>
> Yes, Flink back pressure depend on the Flink task buffer。process task will 
> sends buffer remaining size to source, source will slow down.
>
> https://www.ververica.com/blog/how-flink-handles-backpressure
>
> Jake
>
>
> > On Jul 31, 2020, at 2:48 AM, Suraj Puvvada  wrote:
> >
> > Hello
> >
> > I am trying to understand if Flink has a mechanism to automatically apply 
> > any backpressure by throttling any operators ? For example if I have a 
> > Process function that reads from a Kafkaa source and writes to a Kafka 
> > sink. If the process function is slow will the kafka source be 
> > automatically throttled ?
> >
> > Thanks
> > Suraj
>


Re: FlinkML status

2020-08-03 Thread Till Rohrmann
Hi Mohamed,

the development of FlinkML has been stopped in favour of a new machine
learning library which you can find here [1]. Be aware that this library is
still under development.

[1] https://github.com/apache/flink/tree/master/flink-ml-parent

Cheers,
Till

On Sat, Aug 1, 2020 at 10:35 AM Mohamed Haseeb  wrote:

> Hi,
>
> What's the current status of FlinkML? is it still part of Flink? the last
> Flink release that has documentation about it is 1.8.
>
> Thanks,
> M. Haseeb
>


Re: How long it took a Flink Job to start up ?

2020-08-03 Thread Till Rohrmann
Hi Vijay,

one way to solve this problem is to use Flink's REST API. By querying
REST_ENDPOINT:PORT/jobs/JOB_ID [1] you obtain the job information. The job
information contains the start-time for the whole job and the start-time
for every vertex of the job. By subtracting the job's start time from the
maximum start time of all vertices you should obtain the time it takes to
bring up the whole topology (this only applies to streaming jobs since in
batch jobs not all vertices/operators have to run at the same time).

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid

Cheers,
Till

On Fri, Jul 31, 2020 at 7:56 PM Vijay Balakrishnan 
wrote:

> Hi,
> I am trying to figure how long it took a Flink Job to start up ?
> I used /jobs/overview and it gave me just the start-time as a long value.
> The Flink DashBoard UI shows the Start-Time as auser friendly date. I am
> trying to find or compute how long it took for the Flink Job to start up ?
> Right now, I am manually looking at when the JobManager is up and running
> in the logs and then subtracting the UI start time to get an approximate
> idea. Trying to automate the process.
>
> Attaching 2 screenshots of the /jobs/overview response and the Flink
> DashBoard UI Start Time.
>
>
> TIA,
>


Re: JDBCOutputFormat dependency loading error

2020-08-03 Thread Flavio Pompermaier
Yes Till, I was figuring out that I was using 2 different connectors and I
forgot the mysql jar..I was going to test and tell if the problem was
solved!

On Mon, Aug 3, 2020 at 10:50 AM Till Rohrmann  wrote:

> Hi Flavio,
>
> I am not a JDBC expert but it looks as if you try to
> load com.mysql.cj.jdbc.Driver which is not contained
> in mariadb-java-client-2.6.0.jar. mariadb-java-client-2.6.0.jar only
> contains org/mariadb/jdbc/Driver.class. com.mysql.cj.jdbc.Driver can be
> found in mysql-connector-java.jar, though.
>
> Hence I believe that you are missing some dependencies in your user jar to
> make your job run. Please check from where com.mysql.cj.jdbc.Driver is
> being loaded when running the job from the IDE.
>
> Cheers,
> Till
>
> On Fri, Jul 31, 2020 at 4:55 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I'm trying to run my DataSet job on Flink 1.11.0 and I'm connecting
>> toward Mariadb in my code.
>> I've put the mariadb-java-client-2.6.0.jar in the lib directory and in
>> the pom.xml I set that dependency as provided. The code runs successfully
>> from the Ide but when I try to run the code on the cluster I get the
>> following error:
>>
>> Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:471) ~[?:?]
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>> at java.lang.Class.forName0(Native Method) ~[?:?]
>> at java.lang.Class.forName(Class.java:315) ~[?:?]
>> at
>> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
>> ~myApp.jar:?]
>> at
>> org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
>> ~myApp.jar:?]
>> at
>> org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
>> ~myApp.jar:?]
>> at
>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:82)
>> ~myApp.jar:?]
>> at
>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>> at java.lang.Thread.run(Thread.java:834) ~[?:?]
>>
>> What should I do?
>>
>> Thanks in advance,
>> Flavio
>>
>


Per-job mode job restart and HA configuration

2020-08-03 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hello,

I am using Flink version 1.10.1 in Kubernetes environment. In per-Job mode of 
flink, to achieve HA do we need zookeeper and HA parameters to restart the job? 
I am suspicious because job jar is part of the docker itself.

Thanks,
Suchithra


Re: JDBCOutputFormat dependency loading error

2020-08-03 Thread Till Rohrmann
Glad to hear it!

On Mon, Aug 3, 2020 at 11:59 AM Flavio Pompermaier 
wrote:

> Yes Till, I was figuring out that I was using 2 different connectors and I
> forgot the mysql jar..I was going to test and tell if the problem was
> solved!
>
> On Mon, Aug 3, 2020 at 10:50 AM Till Rohrmann 
> wrote:
>
>> Hi Flavio,
>>
>> I am not a JDBC expert but it looks as if you try to
>> load com.mysql.cj.jdbc.Driver which is not contained
>> in mariadb-java-client-2.6.0.jar. mariadb-java-client-2.6.0.jar only
>> contains org/mariadb/jdbc/Driver.class. com.mysql.cj.jdbc.Driver can be
>> found in mysql-connector-java.jar, though.
>>
>> Hence I believe that you are missing some dependencies in your user jar
>> to make your job run. Please check from where com.mysql.cj.jdbc.Driver is
>> being loaded when running the job from the IDE.
>>
>> Cheers,
>> Till
>>
>> On Fri, Jul 31, 2020 at 4:55 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> I'm trying to run my DataSet job on Flink 1.11.0 and I'm connecting
>>> toward Mariadb in my code.
>>> I've put the mariadb-java-client-2.6.0.jar in the lib directory and in
>>> the pom.xml I set that dependency as provided. The code runs successfully
>>> from the Ide but when I try to run the code on the cluster I get the
>>> following error:
>>>
>>> Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:471) ~[?:?]
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
>>> at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
>>> at java.lang.Class.forName0(Native Method) ~[?:?]
>>> at java.lang.Class.forName(Class.java:315) ~[?:?]
>>> at
>>> org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
>>> ~myApp.jar:?]
>>> at
>>> org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
>>> ~myApp.jar:?]
>>> at
>>> org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
>>> ~myApp.jar:?]
>>> at
>>> org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:82)
>>> ~myApp.jar:?]
>>> at
>>> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
>>> at java.lang.Thread.run(Thread.java:834) ~[?:?]
>>>
>>> What should I do?
>>>
>>> Thanks in advance,
>>> Flavio
>>>
>>


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-03 Thread Seth Wiesman
Hi Jincheng,

I'm very excited to see the enthusiasm for documentation work but I am
concerned about the communities long term ability to maintain this
contribution. In particular, I'm concerned that this proposal duplicates a
lot of content that will quickly get out of sync. So far the community does
not have a great track record for maintaining documentation after its
initial contribution.

In particular, I do not believe the following items need to be copied:

DataTypes
Built-in functions
Connectors
SQL
Catalogs
Configurations

Another issue is that this proposal feels like it is documenting PyFlink
separately from the rest of the project. Things like the cookbook and
tutorial should be under the Try Flink section of the documentation.

Seth


On Mon, Aug 3, 2020 at 1:08 AM jincheng sun 
wrote:

> Would be great if you could join the contribution of PyFlink
> documentation @Marta !
> Thanks for all of the positive feedback. I will start a formal vote then
> later...
>
> Best,
> Jincheng
>
>
> Shuiqiang Chen  于2020年8月3日周一 上午9:56写道:
>
> > Hi jincheng,
> >
> > Thanks for the discussion. +1 for the FLIP.
> >
> > A well-organized documentation will greatly improve the efficiency and
> > experience for developers.
> >
> > Best,
> > Shuiqiang
> >
> > Hequn Cheng  于2020年8月1日周六 上午8:42写道:
> >
> >> Hi Jincheng,
> >>
> >> Thanks a lot for raising the discussion. +1 for the FLIP.
> >>
> >> I think this will bring big benefits for the PyFlink users. Currently,
> >> the Python TableAPI document is hidden deeply under the TableAPI&SQL tab
> >> which makes it quite unreadable. Also, the PyFlink documentation is
> mixed
> >> with Java/Scala documentation. It is hard for users to have an overview
> of
> >> all the PyFlink documents. As more and more functionalities are added
> into
> >> PyFlink, I think it's time for us to refactor the document.
> >>
> >> Best,
> >> Hequn
> >>
> >>
> >> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira  >
> >> wrote:
> >>
> >>> Hi, Jincheng!
> >>>
> >>> Thanks for creating this detailed FLIP, it will make a big difference
> in
> >>> the experience of Python developers using Flink. I'm interested in
> >>> contributing to this work, so I'll reach out to you offline!
> >>>
> >>> Also, thanks for sharing some information on the adoption of PyFlink,
> >>> it's
> >>> great to see that there are already production users.
> >>>
> >>> Marta
> >>>
> >>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang 
> wrote:
> >>>
> >>> > Hi Jincheng,
> >>> >
> >>> > Thanks a lot for bringing up this discussion and the proposal.
> >>> >
> >>> > Big +1 for improving the structure of PyFlink doc.
> >>> >
> >>> > It will be very friendly to give PyFlink users a unified entrance to
> >>> learn
> >>> > PyFlink documents.
> >>> >
> >>> > Best,
> >>> > Xingbo
> >>> >
> >>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
> >>> >
> >>> >> Hi Jincheng,
> >>> >>
> >>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >>> >> improve the Python API doc.
> >>> >>
> >>> >> I have received many feedbacks from PyFlink beginners about
> >>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
> >>> mixed
> >>> >> with the Java doc and it's not easy to find the docs he wants to
> know.
> >>> >>
> >>> >> I think it would greatly improve the user experience if we can have
> >>> one
> >>> >> place which includes most knowledges PyFlink users should know.
> >>> >>
> >>> >> Regards,
> >>> >> Dian
> >>> >>
> >>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
> >>> >>
> >>> >> Hi folks,
> >>> >>
> >>> >> Since the release of Flink 1.11, users of PyFlink have continued to
> >>> grow.
> >>> >> As far as I know there are many companies have used PyFlink for data
> >>> >> analysis, operation and maintenance monitoring business has been put
> >>> into
> >>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).
> According
> >>> to
> >>> >> the feedback we received, current documentation is not very friendly
> >>> to
> >>> >> PyFlink users. There are two shortcomings:
> >>> >>
> >>> >> - Python related content is mixed in the Java/Scala documentation,
> >>> which
> >>> >> makes it difficult for users who only focus on PyFlink to read.
> >>> >> - There is already a "Python Table API" section in the Table API
> >>> document
> >>> >> to store PyFlink documents, but the number of articles is small and
> >>> the
> >>> >> content is fragmented. It is difficult for beginners to learn from
> it.
> >>> >>
> >>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >>> >> documents will be added for those new APIs. In order to increase the
> >>> >> readability and maintainability of the PyFlink document, Wei Zhong
> >>> and me
> >>> >> have discussed offline and would like to rework it via this FLIP.
> >>> >>
> >>> >> We will rework the document around the following three objectives:
> >>> >>
> >>> >> - Add a separate section for Python API under the "Application
> >>> >> Development" section.
> >>> >> - Rest

Re: Per-job mode job restart and HA configuration

2020-08-03 Thread Khachatryan Roman
Hi Suchithra,

Yes, you need to pass these parameters to standalone-job.sh in Kubernetes
job definition.

I'm pulling in Patrick as he might know this subject better.

Regards,
Roman


On Mon, Aug 3, 2020 at 12:24 PM V N, Suchithra (Nokia - IN/Bangalore) <
suchithra@nokia.com> wrote:

> Hello,
>
>
>
> I am using Flink version 1.10.1 in Kubernetes environment. In per-Job mode
> of flink, to achieve HA do we need zookeeper and HA parameters to restart
> the job? I am suspicious because job jar is part of the docker itself.
>
>
>
> Thanks,
>
> Suchithra
>


Re: JDBCOutputFormat dependency loading error

2020-08-03 Thread Flavio Pompermaier
Yes, the problem indeed was mine (2 different connectors for mariadb, both
mysql and mariadb-client), Sorry for the confusion

On Mon, Aug 3, 2020 at 12:26 PM Till Rohrmann  wrote:

> Glad to hear it!
>
> On Mon, Aug 3, 2020 at 11:59 AM Flavio Pompermaier 
> wrote:
>
>> Yes Till, I was figuring out that I was using 2 different connectors and
>> I forgot the mysql jar..I was going to test and tell if the problem was
>> solved!
>>
>> On Mon, Aug 3, 2020 at 10:50 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> I am not a JDBC expert but it looks as if you try to
>>> load com.mysql.cj.jdbc.Driver which is not contained
>>> in mariadb-java-client-2.6.0.jar. mariadb-java-client-2.6.0.jar only
>>> contains org/mariadb/jdbc/Driver.class. com.mysql.cj.jdbc.Driver can be
>>> found in mysql-connector-java.jar, though.
>>>
>>> Hence I believe that you are missing some dependencies in your user jar
>>> to make your job run. Please check from where com.mysql.cj.jdbc.Driver is
>>> being loaded when running the job from the IDE.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Jul 31, 2020 at 4:55 PM Flavio Pompermaier 
>>> wrote:
>>>
 Hi to all,
 I'm trying to run my DataSet job on Flink 1.11.0 and I'm connecting
 toward Mariadb in my code.
 I've put the mariadb-java-client-2.6.0.jar in the lib directory and in
 the pom.xml I set that dependency as provided. The code runs successfully
 from the Ide but when I try to run the code on the cluster I get the
 following error:

 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
 at java.net.URLClassLoader.findClass(URLClassLoader.java:471) ~[?:?]
 at java.lang.ClassLoader.loadClass(ClassLoader.java:589) ~[?:?]
 at
 org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
 at
 org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
 at
 org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
 at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
 at java.lang.Class.forName0(Native Method) ~[?:?]
 at java.lang.Class.forName(Class.java:315) ~[?:?]
 at
 org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getConnection(SimpleJdbcConnectionProvider.java:52)
 ~myApp.jar:?]
 at
 org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.establishConnection(AbstractJdbcOutputFormat.java:66)
 ~myApp.jar:?]
 at
 org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:59)
 ~myApp.jar:?]
 at
 org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:82)
 ~myApp.jar:?]
 at
 org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 ~[flink-dist_2.12-1.11.0.jar:1.11.0]
 at java.lang.Thread.run(Thread.java:834) ~[?:?]

 What should I do?

 Thanks in advance,
 Flavio

>>>


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-03 Thread David Anderson
Jincheng,

One thing that I like about the way that the documentation is currently
organized is that it's relatively straightforward to compare the Python API
with the Java and Scala versions. I'm concerned that if the PyFlink docs
are more independent, it will be challenging to respond to questions about
which features from the other APIs are available from Python.

David

On Mon, Aug 3, 2020 at 8:07 AM jincheng sun 
wrote:

> Would be great if you could join the contribution of PyFlink
> documentation @Marta !
> Thanks for all of the positive feedback. I will start a formal vote then
> later...
>
> Best,
> Jincheng
>
>
> Shuiqiang Chen  于2020年8月3日周一 上午9:56写道:
>
> > Hi jincheng,
> >
> > Thanks for the discussion. +1 for the FLIP.
> >
> > A well-organized documentation will greatly improve the efficiency and
> > experience for developers.
> >
> > Best,
> > Shuiqiang
> >
> > Hequn Cheng  于2020年8月1日周六 上午8:42写道:
> >
> >> Hi Jincheng,
> >>
> >> Thanks a lot for raising the discussion. +1 for the FLIP.
> >>
> >> I think this will bring big benefits for the PyFlink users. Currently,
> >> the Python TableAPI document is hidden deeply under the TableAPI&SQL tab
> >> which makes it quite unreadable. Also, the PyFlink documentation is
> mixed
> >> with Java/Scala documentation. It is hard for users to have an overview
> of
> >> all the PyFlink documents. As more and more functionalities are added
> into
> >> PyFlink, I think it's time for us to refactor the document.
> >>
> >> Best,
> >> Hequn
> >>
> >>
> >> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira  >
> >> wrote:
> >>
> >>> Hi, Jincheng!
> >>>
> >>> Thanks for creating this detailed FLIP, it will make a big difference
> in
> >>> the experience of Python developers using Flink. I'm interested in
> >>> contributing to this work, so I'll reach out to you offline!
> >>>
> >>> Also, thanks for sharing some information on the adoption of PyFlink,
> >>> it's
> >>> great to see that there are already production users.
> >>>
> >>> Marta
> >>>
> >>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang 
> wrote:
> >>>
> >>> > Hi Jincheng,
> >>> >
> >>> > Thanks a lot for bringing up this discussion and the proposal.
> >>> >
> >>> > Big +1 for improving the structure of PyFlink doc.
> >>> >
> >>> > It will be very friendly to give PyFlink users a unified entrance to
> >>> learn
> >>> > PyFlink documents.
> >>> >
> >>> > Best,
> >>> > Xingbo
> >>> >
> >>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
> >>> >
> >>> >> Hi Jincheng,
> >>> >>
> >>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
> >>> >> improve the Python API doc.
> >>> >>
> >>> >> I have received many feedbacks from PyFlink beginners about
> >>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
> >>> mixed
> >>> >> with the Java doc and it's not easy to find the docs he wants to
> know.
> >>> >>
> >>> >> I think it would greatly improve the user experience if we can have
> >>> one
> >>> >> place which includes most knowledges PyFlink users should know.
> >>> >>
> >>> >> Regards,
> >>> >> Dian
> >>> >>
> >>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
> >>> >>
> >>> >> Hi folks,
> >>> >>
> >>> >> Since the release of Flink 1.11, users of PyFlink have continued to
> >>> grow.
> >>> >> As far as I know there are many companies have used PyFlink for data
> >>> >> analysis, operation and maintenance monitoring business has been put
> >>> into
> >>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).
> According
> >>> to
> >>> >> the feedback we received, current documentation is not very friendly
> >>> to
> >>> >> PyFlink users. There are two shortcomings:
> >>> >>
> >>> >> - Python related content is mixed in the Java/Scala documentation,
> >>> which
> >>> >> makes it difficult for users who only focus on PyFlink to read.
> >>> >> - There is already a "Python Table API" section in the Table API
> >>> document
> >>> >> to store PyFlink documents, but the number of articles is small and
> >>> the
> >>> >> content is fragmented. It is difficult for beginners to learn from
> it.
> >>> >>
> >>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
> >>> >> documents will be added for those new APIs. In order to increase the
> >>> >> readability and maintainability of the PyFlink document, Wei Zhong
> >>> and me
> >>> >> have discussed offline and would like to rework it via this FLIP.
> >>> >>
> >>> >> We will rework the document around the following three objectives:
> >>> >>
> >>> >> - Add a separate section for Python API under the "Application
> >>> >> Development" section.
> >>> >> - Restructure current Python documentation to a brand new structure
> to
> >>> >> ensure complete content and friendly to beginners.
> >>> >> - Improve the documents shared by Python/Java/Scala to make it more
> >>> >> friendly to Python users and without affecting Java/Scala users.
> >>> >>
> >>> >> More detail can be found in the FLIP-133:
> >>> >>
> >>>
> https://cwiki.apache.org/

Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-08-03 Thread Vijayendra Yadav
Thank you Arvid, David and Niels for your valuable inputs. One last
Question: How do I terminate the flink streaming execution environment
after the integration test is completed?

Regards
Vijay

On Sun, Aug 2, 2020 at 12:27 PM David Anderson 
wrote:

> Vijay,
>
> There's a section of the docs that describes some strategies for writing
> tests of various types, and it includes some Scala examples [1].
>
> There are also some nice examples from Konstantin Knauf in [2], though
> they are mostly in Java.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> 
> [2] https://github.com/knaufk/flink-testing-pyramid
>
> Best,
> David
>
> On Sun, Aug 2, 2020 at 12:14 PM Arvid Heise  wrote:
>
>> Hi Vijay,
>>
>> Any unit test of Flink operators is actually an IT case as it involves a
>> large portion of the stack. A real unit test, would be over a factored out
>> logic class.
>>
>> Similar to Niels, I'd recommend to use simple sources (env.fromElements)
>> and sinks to inject the data and retrieve the data and put the logic under
>> test in the middle. That may be a part of your pipeline or even the whole
>> pipeline.
>>
>> If you want to have some scala inspiration, have a look at:
>>
>> https://github.com/apache/flink/blob/5f0183fe79d10ac36101f60f2589062a39630f96/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala#L56-L82
>> . It's on table API but should be quite easy to translate to datastream API
>> if needed.
>>
>> On Sat, Aug 1, 2020 at 4:03 PM Niels Basjes  wrote:
>>
>>> No, I only have Java.
>>>
>>> On Fri, 31 Jul 2020, 21:57 Vijayendra Yadav, 
>>> wrote:
>>>
 Thank You Niels. Would you have something for the scala object class.
 Say for example if I want to implement a unit test ( not integration test)
 for below code or similar  :


 https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala

 Regards,
 Vijay

 On Fri, Jul 31, 2020 at 12:22 PM Niels Basjes  wrote:

> Does this test in one of my own projects do what you are looking for?
>
>
> https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107
>
>
> On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav, 
> wrote:
>
>> Hi Team,
>>
>> Looking for some help and reference code / material to implement unit
>> tests of possible scenarios in Flink *streaming *Code that should
>> assert specific cases.
>>
>> Regards,
>> Vijay
>>
>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>


Flink Kafka consumer SimpleStringSchema [Deprecated]

2020-08-03 Thread Vijayendra Yadav
 Hi Team,

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

new SimpleStringSchema()  --> Is showing Deprecated in my IntelliJ.
Although it's working fine, Wanted to check if there is a replacement for
it ?



val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
stream = env
.addSource(new FlinkKafkaConsumer[String]("topic", new
SimpleStringSchema(), properties))

Regards,
Vijay


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-03 Thread Eleanore Jin
Hi Till,

Thanks for the reply!

I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
Specifically, I build a custom docker image, which I copied the app jar
(not uber jar) and all its dependencies under /flink/lib.

So my question is more like, in this case, if the job is marked as FAILED,
which causes k8s to restart the pod, this seems not help at all, what are
the suggestions for such scenario?

Thanks a lot!
Eleanore

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes

On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann  wrote:

> Hi Eleanore,
>
> how are you deploying Flink exactly? Are you using the application mode
> with native K8s support to deploy a cluster [1] or are you manually
> deploying a per-job mode [2]?
>
> I believe the problem might be that we terminate the Flink process with a
> non-zero exit code if the job reaches the ApplicationStatus.FAILED [3].
>
> cc Yang Wang have you observed a similar behavior when running Flink in
> per-job mode on K8s?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
> [3]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>
> On Fri, Jul 31, 2020 at 6:26 PM Eleanore Jin 
> wrote:
>
>> Hi Experts,
>>
>> I have a flink cluster (per job mode) running on kubernetes. The job is
>> configured with restart strategy
>>
>> restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 
>> 10 s
>>
>>
>> So after 3 times retry, the job will be marked as FAILED, hence the pods
>> are not running. However, kubernetes will then restart the job again as the
>> available replicas do not match the desired one.
>>
>> I wonder what are the suggestions for such a scenario? How should I
>> configure the flink job running on k8s?
>>
>> Thanks a lot!
>> Eleanore
>>
>


Re: Flink Kafka consumer SimpleStringSchema [Deprecated]

2020-08-03 Thread Khachatryan Roman
Hi Vijay,

The javadoc for
org.apache.flink.streaming.util.serialization.SimpleStringSchema says
you should Use org.apache.flink.api.common.serialization.SimpleStringSchema
instead.

Regards,
Roman


On Mon, Aug 3, 2020 at 5:31 PM Vijayendra Yadav 
wrote:

> Hi Team,
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>
> new SimpleStringSchema()  --> Is showing Deprecated in my IntelliJ.
> Although it's working fine, Wanted to check if there is a replacement for
> it ?
>
>
>
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "localhost:9092")
> properties.setProperty("group.id", "test")
> stream = env
> .addSource(new FlinkKafkaConsumer[String]("topic", new
> SimpleStringSchema(), properties))
>
> Regards,
> Vijay
>


[Announce] Flink Forward Global Lineup Released

2020-08-03 Thread Seth Wiesman
Hi everyone!

I'm very excited to announce that the speaker lineup for Flink Forward
Global has been released. This is a fully online conference on October
21-22 and tickets are free. The lineup includes 45+ speakers from the
companies like of Spotify, AWS, Netflix, Houzz, Workday hear their Flink
experiences and community talks to discuss the latest technology
developments.

Additionally, there are 6 half-day workshops (paid) to learn how to
develop, deploy, operate, and troubleshoot your Flink applications, as well
as other topics, such as Stateful Functions and Flink SQL. Seats for
training are limited.

Thank you to everyone who submitted a talk along with our amazing Program
Committee who helped put this lineup together.

Best,

Seth Wiesman
- Program Committee Chair - Flink Forward Global
- Committer Apache Flink


Re: [Announce] Flink Forward Global Lineup Released

2020-08-03 Thread Seth Wiesman
+ link

https://www.flink-forward.org/global-2020/speakers

On Mon, Aug 3, 2020 at 11:25 AM Seth Wiesman  wrote:

> Hi everyone!
>
> I'm very excited to announce that the speaker lineup for Flink Forward
> Global has been released. This is a fully online conference on October
> 21-22 and tickets are free. The lineup includes 45+ speakers from the
> companies like of Spotify, AWS, Netflix, Houzz, Workday hear their Flink
> experiences and community talks to discuss the latest technology
> developments.
>
> Additionally, there are 6 half-day workshops (paid) to learn how to
> develop, deploy, operate, and troubleshoot your Flink applications, as well
> as other topics, such as Stateful Functions and Flink SQL. Seats for
> training are limited.
>
> Thank you to everyone who submitted a talk along with our amazing Program
> Committee who helped put this lineup together.
>
> Best,
>
> Seth Wiesman
> - Program Committee Chair - Flink Forward Global
> - Committer Apache Flink
>


Re: [Flink Unit Tests] Unit test for Flink streaming codes

2020-08-03 Thread Niels Basjes
Hi,

If you look at the example from my own project you'll see that this is not
a problem (if you test it like this).

In some rare testing cases you may run into this problem and for those:
have a look at what I did a few weeks ago for testing the PubSub connector:
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java#L146

Here I'm using the fact that the DeserializationSchema interface has a
method isEndOfStream that can be used to terminate a stream.

Niels

On Mon, Aug 3, 2020 at 5:24 PM Vijayendra Yadav 
wrote:

> Thank you Arvid, David and Niels for your valuable inputs. One last
> Question: How do I terminate the flink streaming execution environment
> after the integration test is completed?
>
> Regards
> Vijay
>
> On Sun, Aug 2, 2020 at 12:27 PM David Anderson 
> wrote:
>
>> Vijay,
>>
>> There's a section of the docs that describes some strategies for writing
>> tests of various types, and it includes some Scala examples [1].
>>
>> There are also some nice examples from Konstantin Knauf in [2], though
>> they are mostly in Java.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
>> 
>> [2] https://github.com/knaufk/flink-testing-pyramid
>>
>> Best,
>> David
>>
>> On Sun, Aug 2, 2020 at 12:14 PM Arvid Heise  wrote:
>>
>>> Hi Vijay,
>>>
>>> Any unit test of Flink operators is actually an IT case as it involves a
>>> large portion of the stack. A real unit test, would be over a factored out
>>> logic class.
>>>
>>> Similar to Niels, I'd recommend to use simple sources (env.fromElements)
>>> and sinks to inject the data and retrieve the data and put the logic under
>>> test in the middle. That may be a part of your pipeline or even the whole
>>> pipeline.
>>>
>>> If you want to have some scala inspiration, have a look at:
>>>
>>> https://github.com/apache/flink/blob/5f0183fe79d10ac36101f60f2589062a39630f96/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SortITCase.scala#L56-L82
>>> . It's on table API but should be quite easy to translate to datastream API
>>> if needed.
>>>
>>> On Sat, Aug 1, 2020 at 4:03 PM Niels Basjes  wrote:
>>>
 No, I only have Java.

 On Fri, 31 Jul 2020, 21:57 Vijayendra Yadav, 
 wrote:

> Thank You Niels. Would you have something for the scala object class.
> Say for example if I want to implement a unit test ( not integration test)
> for below code or similar  :
>
>
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
>
> Regards,
> Vijay
>
> On Fri, Jul 31, 2020 at 12:22 PM Niels Basjes  wrote:
>
>> Does this test in one of my own projects do what you are looking for?
>>
>>
>> https://github.com/nielsbasjes/yauaa/blob/1e1ceb85c507134614186e3e60952112a2daabff/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperClass.java#L107
>>
>>
>> On Fri, 31 Jul 2020, 20:20 Vijayendra Yadav, 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Looking for some help and reference code / material to implement
>>> unit tests of possible scenarios in Flink *streaming *Code that
>>> should assert specific cases.
>>>
>>> Regards,
>>> Vijay
>>>
>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Kafka transaction error lead to data loss under end to end exact-once

2020-08-03 Thread Lu Niu
Hi,

We are using end to end exact-once flink + kafka and encountered belowing
exception which usually came after checkpoint failures:
```














*Caused by: org.apache.kafka.common.errors.ProducerFencedException:
Producer attempted an operation with an old epoch. Either there is a newer
producer with the same transactionalId, or the producer's transaction has
been expired by the broker.2020-07-28 16:27:51,633 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Job xxx
(f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)Caused by:
org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
failed, logging first encountered failure at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
more*
```
We did some end to end tests and noticed whenever such a thing happens,
there will be a data loss.

Referring to several related questions, I understand I need to increase `
transaction.timeout.ms`  because:
```
*Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
that were started before taking a checkpoint, after recovering from the
said checkpoint. If the time between Flink application crash and completed
restart is larger than Kafka’s transaction timeout there will be data loss
(Kafka will automatically abort transactions that exceeded timeout time).*
```

But I want to confirm with the community that:
*Does an exception like this will always lead to data loss? *

I asked because we get this exception sometimes even when the checkpoint
succeeds.

Setup:
Flink 1.9.1

Best
Lu


Re: Between Flink 1.9 and 1.11 - any behavior change for web.upload.dir

2020-08-03 Thread Avijit Saha
Hello,

Has there been any change in behavior related to the "web.upload.dir"
behavior between Flink 1.9 and 1.11?

I have a failure case where when build an image using
"flink:1.11.0-scala_2.12" in Dockerfile, the job manager job submissions
fail with the following Exception but the same flow works fine (for the
same underlying Code image) when using
"flink:1.9.1-scala_2.12"..

This is the Exception stack trace for 1.11 and not seen using 1.9:
--
Caused by: java.nio.file.FileAlreadyExistsException:
/opt/flink/flink-web-upload
at
sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
~[?:1.8.0_262]
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
~[?:1.8.0_262]
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
~[?:1.8.0_262]
at
sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
~[?:1.8.0_262]
at java.nio.file.Files.createDirectory(Files.java:674)
~[?:1.8.0_262]
at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
~[?:1.8.0_262]
at java.nio.file.Files.createDirectories(Files.java:727)
~[?:1.8.0_262]
at
org.apache.flink.runtime.rest.RestServerEndpoint.checkAndCreateUploadDir(RestServerEndpoint.java:478)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rest.RestServerEndpoint.createUploadDir(RestServerEndpoint.java:462)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rest.RestServerEndpoint.(RestServerEndpoint.java:114)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.(WebMonitorEndpoint.java:200)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.(DispatcherRestEndpoint.java:68)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rest.SessionRestEndpointFactory.createRestEndpoint(SessionRestEndpointFactory.java:63)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:152)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
... 2 more

>


Any change in behavior related to the "web.upload.dir" behavior between Flink 1.9 and 1.11

2020-08-03 Thread Avijit Saha
Hello,

Has there been any change in behavior related to the "web.upload.dir"
behavior between Flink 1.9 and 1.11?

I have a failure case where when build an image using
"flink:1.11.0-scala_2.12" in Dockerfile, the job manager job submissions
fail with the following Exception but the same flow works fine (for the
same underlying Code image) when using
"flink:1.9.1-scala_2.12"..

This is the Exception stack trace for 1.11 and not seen using 1.9:
--
Caused by: java.nio.file.FileAlreadyExistsException:
/opt/flink/flink-web-upload
at
sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
~[?:1.8.0_262]
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
~[?:1.8.0_262]
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
~[?:1.8.0_262]
at
sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
~[?:1.8.0_262]
at java.nio.file.Files.createDirectory(Files.java:674)
~[?:1.8.0_262]
at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
~[?:1.8.0_262]
at java.nio.file.Files.createDirectories(Files.java:727)
~[?:1.8.0_262]
at
org.apache.flink.runtime.rest.RestServerEndpoint.checkAndCreateUploadDir(RestServerEndpoint.java:478)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rest.RestServerEndpoint.createUploadDir(RestServerEndpoint.java:462)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rest.RestServerEndpoint.(RestServerEndpoint.java:114)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.(WebMonitorEndpoint.java:200)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.(DispatcherRestEndpoint.java:68)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.rest.SessionRestEndpointFactory.createRestEndpoint(SessionRestEndpointFactory.java:63)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:152)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
... 2 more


Re: Any change in behavior related to the "web.upload.dir" behavior between Flink 1.9 and 1.11

2020-08-03 Thread Chesnay Schepler

From what I can tell we have not changed anything.

Are you making any modifications to the image? This exception should 
only be thrown if there is already a file with the same path, and I 
don't think Flink would do that.


On 03/08/2020 21:43, Avijit Saha wrote:

Hello,

Has there been any change in behavior related to the "web.upload.dir" 
behavior between Flink 1.9 and 1.11?


I have a failure case where when build an image using 
"flink:1.11.0-scala_2.12" in Dockerfile, the job manager job 
submissions fail with the following Exception but the same flow works 
fine (for the same underlying Code image) when using 
"flink:1.9.1-scala_2.12"..


This is the Exception stack trace for 1.11 and not seen using 1.9:
--
Caused by: java.nio.file.FileAlreadyExistsException: 
/opt/flink/flink-web-upload
at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) 
~[?:1.8.0_262]
at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) 
~[?:1.8.0_262]
at 
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) 
~[?:1.8.0_262]
at 
sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) 
~[?:1.8.0_262]
at java.nio.file.Files.createDirectory(Files.java:674) 
~[?:1.8.0_262]
at 
java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) 
~[?:1.8.0_262]
at java.nio.file.Files.createDirectories(Files.java:727) 
~[?:1.8.0_262]
at 
org.apache.flink.runtime.rest.RestServerEndpoint.checkAndCreateUploadDir(RestServerEndpoint.java:478) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rest.RestServerEndpoint.createUploadDir(RestServerEndpoint.java:462) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rest.RestServerEndpoint.(RestServerEndpoint.java:114) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.(WebMonitorEndpoint.java:200) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.(DispatcherRestEndpoint.java:68) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rest.SessionRestEndpointFactory.createRestEndpoint(SessionRestEndpointFactory.java:63) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:152) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168) 
~[flink-dist_2.12-1.11.0.jar:1.11.0]

... 2 more





Re: Any change in behavior related to the "web.upload.dir" behavior between Flink 1.9 and 1.11

2020-08-03 Thread Avijit Saha
Thanks!

It seems the problem went away when I started using 'ln -s
$FLINK_HOME/usrlib $FLINK_HOME/flink-web-upload' in my Dockerfile!


On Mon, Aug 3, 2020 at 3:09 PM Chesnay Schepler  wrote:

> From what I can tell we have not changed anything.
>
> Are you making any modifications to the image? This exception should only
> be thrown if there is already a file with the same path, and I don't think
> Flink would do that.
>
> On 03/08/2020 21:43, Avijit Saha wrote:
>
> Hello,
>
> Has there been any change in behavior related to the "web.upload.dir"
> behavior between Flink 1.9 and 1.11?
>
> I have a failure case where when build an image using
> "flink:1.11.0-scala_2.12" in Dockerfile, the job manager job submissions
> fail with the following Exception but the same flow works fine (for the
> same underlying Code image) when using
> "flink:1.9.1-scala_2.12"..
>
> This is the Exception stack trace for 1.11 and not seen using 1.9:
>
> --
> Caused by: java.nio.file.FileAlreadyExistsException:
> /opt/flink/flink-web-upload
> at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
> ~[?:1.8.0_262]
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> ~[?:1.8.0_262]
> at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> ~[?:1.8.0_262]
> at
> sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
> ~[?:1.8.0_262]
> at java.nio.file.Files.createDirectory(Files.java:674)
> ~[?:1.8.0_262]
> at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
> ~[?:1.8.0_262]
> at java.nio.file.Files.createDirectories(Files.java:727)
> ~[?:1.8.0_262]
> at
> org.apache.flink.runtime.rest.RestServerEndpoint.checkAndCreateUploadDir(RestServerEndpoint.java:478)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.rest.RestServerEndpoint.createUploadDir(RestServerEndpoint.java:462)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.rest.RestServerEndpoint.(RestServerEndpoint.java:114)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.(WebMonitorEndpoint.java:200)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.(DispatcherRestEndpoint.java:68)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.rest.SessionRestEndpointFactory.createRestEndpoint(SessionRestEndpointFactory.java:63)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:152)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
> ~[flink-dist_2.12-1.11.0.jar:1.11.0]
> ... 2 more
>
>
>


Re: Flink Kafka consumer SimpleStringSchema [Deprecated]

2020-08-03 Thread Vijayendra Yadav
Thank You Roman. There is one more:  env.*setStateBackend*

Deprecated: setStateBackend from *org.apache.flink.streaming.api.scala*

Regards,
Vijay

On Mon, Aug 3, 2020 at 9:21 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Vijay,
>
> The javadoc for
> org.apache.flink.streaming.util.serialization.SimpleStringSchema says
> you should Use
> org.apache.flink.api.common.serialization.SimpleStringSchema instead.
>
> Regards,
> Roman
>
>
> On Mon, Aug 3, 2020 at 5:31 PM Vijayendra Yadav 
> wrote:
>
>> Hi Team,
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>
>> new SimpleStringSchema()  --> Is showing Deprecated in my IntelliJ.
>> Although it's working fine, Wanted to check if there is a replacement for
>> it ?
>>
>>
>>
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", "localhost:9092")
>> properties.setProperty("group.id", "test")
>> stream = env
>> .addSource(new FlinkKafkaConsumer[String]("topic", new
>> SimpleStringSchema(), properties))
>>
>> Regards,
>> Vijay
>>
>


Flink CPU load metrics in K8s

2020-08-03 Thread Bajaj, Abhinav
Hi,



I am trying to understand the CPU Load metrics reported by Flink 1.7.1 running 
with openjdk 1.8.0_212 on K8s.



After deploying the Flink Job on K8s, I tried to get CPU Load metrics following 
this 
documentation.

curl 
localhost:8081/taskmanagers/7737ac33b311ea0a696422680711597b/metrics?get=Status.JVM.CPU.Load,Status.JVM.CPU.Time

[{"id":"Status.JVM.CPU.Load","value":"0.0023815194093831865"},{"id":"Status.JVM.CPU.Time","value":"2326000"}]



The value of the CPU load looks odd to me.



What is the unit and scale of this value?

How does Flink determine this value?



Appreciate your time and help here.

~ Abhinav Bajaj



Re: Flink streaming job logging reserves space

2020-08-03 Thread Yang Wang
Hi Maxim,

First, i want to confirm with you that do you have checked all the
"yarn.nodemanager.log-dirs". If you
could access the logs in Flink webUI, the log files(e.g. taskmanager.log,
taskmanager.out, taskmanager.err)
should exist. I suggest to double check the multiple log-dirs.

Since the *.out/err files do not roll, if you print some user logs to the
stdout/stderr, the two files will increase
over time.

When you stop the Flink application, Yarn will clean up all the jars and
logs, so you find that the disk space get back.


Best,
Yang

Maxim Parkachov  于2020年7月30日周四 下午10:00写道:

> Hi everyone,
>
> I have a strange issue with flink logging. I use pretty much standard log4
> config, which is writing to standard output in order to see it in Flink
> GUI. Deployment is on YARN with job mode. I can see logs in UI, no problem.
> On the servers, where Flink YARN containers are running, there is disk
> quota on the partition where YARN normally creates logs. I see no specific
> files in the application_xx directory, but space on the disk is actually
> decreasing with time. After several weeks we eventually hit quota. It seems
> like some file or pipe is created but not closed, but still reserves the
> space. After I restart Flink job, space is immediately returned back. I'm
> sure that flink job is the problem, I have re-produces issue on a cluster
> where only 1 filnk job was running. Below is my log4 config. Any help or
> idea is appreciated.
>
> Thanks in advance,
> Maxim.
> ---
> # This affects logging for both user code and Flink
> log4j.rootLogger=INFO, file, stderr
>
> # Uncomment this if you want to _only_ change Flink's logging
> #log4j.logger.org.apache.flink=INFO
>
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> log4j.logger.akka=INFO
> log4j.logger.org.apache.kafka=INFO
> log4j.logger.org.apache.hadoop=INFO
> log4j.logger.org.apache.zookeeper=INFO
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
> file
>
>


Re: Per-job mode job restart and HA configuration

2020-08-03 Thread Yang Wang
Hi Suchithra,

Roman is right. You still need zookeeper HA configured so that the job
could recover successfully when jobmanager failover.
Although job jar is bundled in the image, the checkpoint counter and path
need to be stored in zookpeeper. When the jobmanager
terminated exceptionally and relaunched by K8s, we need to recover from the
latest checkpoint automatically.

Another reason is for leader election and retrieval. For some corner cases,
for example, kubelet is crashed, two jobmanager may be
running even the replica of deployment is 1. We need zookeeper for the
leader election and leader retrieval so that the taskmanager
could find the active jobmanager.

A native K8s HA is requested in FLINK-12884[1], i will try to push it
implemented in next major release(1.12). After that, the HA configuration
on K8s will be more convenient.


[1]. https://issues.apache.org/jira/browse/FLINK-12884


Best,
Yang

Khachatryan Roman  于2020年8月3日周一 下午10:03写道:

> Hi Suchithra,
>
> Yes, you need to pass these parameters to standalone-job.sh in Kubernetes
> job definition.
>
> I'm pulling in Patrick as he might know this subject better.
>
> Regards,
> Roman
>
>
> On Mon, Aug 3, 2020 at 12:24 PM V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com> wrote:
>
>> Hello,
>>
>>
>>
>> I am using Flink version 1.10.1 in Kubernetes environment. In per-Job
>> mode of flink, to achieve HA do we need zookeeper and HA parameters to
>> restart the job? I am suspicious because job jar is part of the docker
>> itself.
>>
>>
>>
>> Thanks,
>>
>> Suchithra
>>
>


Re: Flink streaming job logging reserves space

2020-08-03 Thread Maxim Parkachov
Hi Yang,

you are right. Since then, I looked for open files and found *.out/*.err
files on that partition and as you mentioned they don't roll.
I could implement a workaround to restart the streaming job every week or
so, but I really don't want to go this way.

I tried to forward logs to files and then I could roll them, but then I
don't see logs in the GUI.

So my question would be, how to make them roll ?

Regards,
Maxim.

On Tue, Aug 4, 2020 at 4:48 AM Yang Wang  wrote:

> Hi Maxim,
>
> First, i want to confirm with you that do you have checked all the
> "yarn.nodemanager.log-dirs". If you
> could access the logs in Flink webUI, the log files(e.g. taskmanager.log,
> taskmanager.out, taskmanager.err)
> should exist. I suggest to double check the multiple log-dirs.
>
> Since the *.out/err files do not roll, if you print some user logs to the
> stdout/stderr, the two files will increase
> over time.
>
> When you stop the Flink application, Yarn will clean up all the jars and
> logs, so you find that the disk space get back.
>
>
> Best,
> Yang
>
> Maxim Parkachov  于2020年7月30日周四 下午10:00写道:
>
>> Hi everyone,
>>
>> I have a strange issue with flink logging. I use pretty much standard
>> log4 config, which is writing to standard output in order to see it in
>> Flink GUI. Deployment is on YARN with job mode. I can see logs in UI, no
>> problem. On the servers, where Flink YARN containers are running, there is
>> disk quota on the partition where YARN normally creates logs. I see no
>> specific files in the application_xx directory, but space on the disk is
>> actually decreasing with time. After several weeks we eventually hit quota.
>> It seems like some file or pipe is created but not closed, but still
>> reserves the space. After I restart Flink job, space is
>> immediately returned back. I'm sure that flink job is the problem, I have
>> re-produces issue on a cluster where only 1 filnk job was running. Below is
>> my log4 config. Any help or idea is appreciated.
>>
>> Thanks in advance,
>> Maxim.
>> ---
>> # This affects logging for both user code and Flink
>> log4j.rootLogger=INFO, file, stderr
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> #log4j.logger.org.apache.flink=INFO
>>
>> # The following lines keep the log level of common libraries/connectors on
>> # log level INFO. The root logger does not override this. You have to
>> manually
>> # change the log levels here.
>> log4j.logger.akka=INFO
>> log4j.logger.org.apache.kafka=INFO
>> log4j.logger.org.apache.hadoop=INFO
>> log4j.logger.org.apache.zookeeper=INFO
>>
>> # Log all infos in the given file
>> log4j.appender.file=org.apache.log4j.FileAppender
>> log4j.appender.file.file=${log.file}
>> log4j.appender.file.append=false
>> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
>> %-5p %-60c %x - %m%n
>>
>> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
>> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
>> file
>>
>>


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-03 Thread Yang Wang
Hi Eleanore,

I think you are using K8s resource "Job" to deploy the jobmanager. Please
set .spec.template.spec.restartPolicy = "Never" and spec.backoffLimit = 0.
Refer here[1] for more information.

Then, when the jobmanager failed because of any reason, the K8s job will be
marked failed. And K8s will not restart the job again.

[1].
https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup


Best,
Yang

Eleanore Jin  于2020年8月4日周二 上午12:05写道:

> Hi Till,
>
> Thanks for the reply!
>
> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
> Specifically, I build a custom docker image, which I copied the app jar
> (not uber jar) and all its dependencies under /flink/lib.
>
> So my question is more like, in this case, if the job is marked as FAILED,
> which causes k8s to restart the pod, this seems not help at all, what are
> the suggestions for such scenario?
>
> Thanks a lot!
> Eleanore
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>
> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann  wrote:
>
>> Hi Eleanore,
>>
>> how are you deploying Flink exactly? Are you using the application mode
>> with native K8s support to deploy a cluster [1] or are you manually
>> deploying a per-job mode [2]?
>>
>> I believe the problem might be that we terminate the Flink process with a
>> non-zero exit code if the job reaches the ApplicationStatus.FAILED [3].
>>
>> cc Yang Wang have you observed a similar behavior when running Flink in
>> per-job mode on K8s?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
>> [3]
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>>
>> On Fri, Jul 31, 2020 at 6:26 PM Eleanore Jin 
>> wrote:
>>
>>> Hi Experts,
>>>
>>> I have a flink cluster (per job mode) running on kubernetes. The job is
>>> configured with restart strategy
>>>
>>> restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 
>>> 10 s
>>>
>>>
>>> So after 3 times retry, the job will be marked as FAILED, hence the pods
>>> are not running. However, kubernetes will then restart the job again as the
>>> available replicas do not match the desired one.
>>>
>>> I wonder what are the suggestions for such a scenario? How should I
>>> configure the flink job running on k8s?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>