Re: Unit testing PyFlink SQL project

2022-04-23 Thread Dian Fu
Hi John,

I have written an example on how to write unit tests of Flink
functionalities with PyFlink in [1]. Hope it is helpful for you. Feel free
to let me know if there are any problems.

Regards,
Dian

[1] https://github.com/dianfu/pyflink-faq/tree/main/testing

On Sun, Apr 24, 2022 at 9:25 AM Dian Fu  wrote:

> Hi John,
>
> >> I don't know how to fix this. I've tried adding `flink-table-planner`
> and `flink-table-planner-blink` dependencies with `test-jar`
> to my dummy pom.xml, but it still fails.
> What's the failure after doing this? The flink-table-planner*-tests.jar
> should be available in maven repository[1].
>
> >> This is starting to feel like a real pain to do something that should
> be trivial: basic TDD of a PyFlink project.  Is there a real-world example
> of a Python project that shows how to set up a testing environment for unit
> testing SQL with PyFlink?
> I'm not aware of such a project, however I agree that this may be a very
> important aspect which should be improved. I will look into this.
>
> Regards,
> Dian
>
> [1]
> https://repo1.maven.org/maven2/org/apache/flink/flink-table-planner_2.11/1.13.6/
>
>
> On Sun, Apr 24, 2022 at 4:44 AM John Tipper 
> wrote:
>
>> Hi all,
>>
>> Is there an example of a self-contained repository showing how to perform
>> SQL unit testing of PyFlink (specifically 1.13.x if possible)?  I have
>> cross-posted the question to Stack Overflow here:
>> https://stackoverflow.com/questions/71983434/is-there-an-example-of-pyflink-sql-unit-testing-in-a-self-contained-repo
>>
>>
>> There is a related SO question (
>> https://stackoverflow.com/questions/69937520/pyflink-sql-local-test),
>> where it is suggested to use some of the tests from PyFlink itself.  The
>> issue I'm running into is that the PyFlink repo assumes that a bunch of
>> things are on the Java classpath and that some Python utility classes are
>> available (they're not distributed via PyPi apache-flink).
>>
>> I have done the following:
>>
>>
>>1. Copied `test_case_utils.py` and `source_sink_utils.py` from
>>PyFlink (
>>
>> https://github.com/apache/flink/tree/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing)
>>into my project.
>>2. Copy an example unit test (
>>
>> https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/table/tests/test_sql.py#L39)
>>as suggested by the related SO question.
>>3.
>>
>> When I try to run the test, I get an error because the test case cannot
>> determine what version of Avro jars to download (`download_apache_avro()`
>> fails, because pyflink_gateway_server.py tries to evaluate the value of
>> `avro.version` by running `mvn help:evaluate -Dexpression=avro.version`)
>>
>> I then added a dummy `pom.xml` defining a Maven property of
>> `avro.version` (with a value of `1.10.0`) and my unit test case is loaded.
>>
>> I now get a new error and my test is skipped:
>>
>> 'flink-table-planner*-tests.jar' is not available. Will skip the
>> related tests.
>>
>> I don't know how to fix this. I've tried adding `flink-table-planner` and
>> `flink-table-planner-blink` dependencies with `test-jar` to my
>> dummy pom.xml, but it still fails.
>>
>> This is starting to feel like a real pain to do something that should be
>> trivial: basic TDD of a PyFlink project.  Is there a real-world example of
>> a Python project that shows how to set up a testing environment for unit
>> testing SQL with PyFlink?
>>
>> Many thanks,
>>
>> John
>>
>


Fwd: Write to Aliyun OSS via FileSystem connector hang Job Master on Finishing

2022-04-23 Thread Yi Tang
-- Forwarded message -
From: Yi Tang 
Date: Sun, Apr 24, 2022 at 11:29 AM
Subject: Write to Aliyun OSS via FileSystem connector hang Job Master on
Finishing
To: 


Hi team;

I'm trying to write to aliyun oss via FileSystem connector. The job master
always hangs on finishing.

Looks like it is because the FileSystem connector commits the files by
#finalizeGlobal while the Job is finishing, which includes some rename
operations. However, the aliyun oss FileSystem renames files by copying,
which seems expensive.

Any suggestions about this scenario?

Thanks and regards.


Re: JobManager doesn't bring up new TaskManager during failure recovery

2022-04-23 Thread Zheng, Chenyu
Thank you so much, Yang!

It looks very likely that this issue is causing the bug I met. I’ll upgrade my 
Flink version and test if I can reproduce that bug.

BRs,
Chenyu

From: Yang Wang 
Date: Sunday, April 24, 2022 at 10:20 AM
To: "Zheng, Chenyu" 
Cc: "user@flink.apache.org" , "user...@flink.apache.org" 

Subject: Re: JobManager doesn't bring up new TaskManager during failure recovery

After more debugging, I think this issue is same as FLINK-24315[1], which is 
fixed in 1.13.3.

[1]. 
https://issues.apache.org/jira/browse/FLINK-24315

Best,
Yang

Zheng, Chenyu 
mailto:chenyu.zh...@disneystreaming.com>> 
于2022年4月22日周五 18:27写道:
I created a JIRA ticket 
https://issues.apache.org/jira/browse/FLINK-27350
 to track this issue.

BRs,
Chenyu

From: "Zheng, Chenyu" 
mailto:chenyu.zh...@disneystreaming.com>>
Date: Friday, April 22, 2022 at 6:26 PM
To: Yang Wang mailto:danrtsey...@gmail.com>>
Cc: "user@flink.apache.org" 
mailto:user@flink.apache.org>>, 
"user...@flink.apache.org" 
mailto:user...@flink.apache.org>>
Subject: Re: JobManager doesn't bring up new TaskManager during failure recovery

Thank you, Yang!

In fact I have a fine-grained dashboard for Kubernetes cluster health (like 
apiserver qps/latency etc.), and I didn't find anything unusual… Also, the 
JobManager container cpu/memory usage is low.

Besides, I have a deep dive in these logs and Flink resource manager code, and 
find something interesting. I use taskmanager-1-9 to give you an example:

  1.  I can see logs “Requesting new worker with resource spec 
WorkerResourceSpec” at 2022-04-17 00:33:15,333. And the code location is 
here.
  2.  “Creating new TaskManager pod with name 
stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-9 and resource 
<16384,4.0>” at 2022-04-17 00:33:15,376, code 
location.
  3.  “Pod stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-9 is created.” 
at 2022-04-17 00:33:15,412, code 
location.
 The request is sent and pod is created here, so I think the apiserver is 
healthy at that moment.
  4.  But I cannot find any logs that print in 
line

Re: JobManager doesn't bring up new TaskManager during failure recovery

2022-04-23 Thread Yang Wang
After more debugging, I think this issue is same as FLINK-24315[1],
which is fixed in 1.13.3.

[1]. https://issues.apache.org/jira/browse/FLINK-24315

Best,
Yang

Zheng, Chenyu  于2022年4月22日周五 18:27写道:

> I created a JIRA ticket https://issues.apache.org/jira/browse/FLINK-27350
> to track this issue.
>
>
>
> BRs,
>
> Chenyu
>
>
>
> *From: *"Zheng, Chenyu" 
> *Date: *Friday, April 22, 2022 at 6:26 PM
> *To: *Yang Wang 
> *Cc: *"user@flink.apache.org" , "
> user...@flink.apache.org" 
> *Subject: *Re: JobManager doesn't bring up new TaskManager during failure
> recovery
>
>
>
> Thank you, Yang!
>
>
>
> In fact I have a fine-grained dashboard for Kubernetes cluster health
> (like apiserver qps/latency etc.), and I didn't find anything unusual…
> Also, the JobManager container cpu/memory usage is low.
>
>
>
> Besides, I have a deep dive in these logs and Flink resource manager code,
> and find something interesting. I use taskmanager-1-9 to give you an
> example:
>
>1. I can see logs “Requesting new worker with resource spec
>WorkerResourceSpec” at 2022-04-17 00:33:15,333. And the code location is
>here
>
> 
>.
>2. “Creating new TaskManager pod with name
>stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-9 and resource
><16384,4.0>” at 2022-04-17 00:33:15,376, code location
>
> 
>.
>3. “Pod stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-9 is
>created.” at 2022-04-17 00:33:15,412, code location
>
> .
>*The request is sent and pod is created here, so I think the apiserver
>is healthy at that moment.*
>4. But I cannot find any logs that print in line
>
> 
>and line
>
> 
>.
>5. “Discard registration from TaskExecutor
>stream-1376a7c25e714f06b2ca818af964c45c-taskmanager-1-9” at 2022-04-17
>00:33:32,393. Root cause of this logs is due to the workerNodeMap
>
> 

Re: KAFKA SINK ERROR IN Flink-(version 1.14.4)

2022-04-23 Thread Dian Fu
Hi Harshit,

Could you try to update the following line `ds = ds.map(lambda x:
','.join([str(value) for value in x]))` as following:
`ds = ds.map(lambda x: ','.join([str(value) for value in x]),
output_type=Types.STRING())`

The reason is that if the output type is not specified, it will be
serialized using Pickle and so it will be a byte array. This works well
for immediate transformations as the output of the upstream operations will
also be deserialized using Pickle. However, when the output needs to be
written to a sink, the output type of the upstream operation must be
specified. See [1] for more details.

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/data_types/#passing-python-records-to-java-operations

On Sat, Apr 23, 2022 at 1:46 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.14.4 & using reference code from
> pyflink getting started pages.
>
>
>
> I am getting following error .
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
>
> : org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> Caught exception while processing timer.
>
> Caused by:
> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator}
>
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> Caused by: java.lang.ClassCastException: [B cannot be cast to
> java.lang.String
>
>
>
>
>
> Below is my code for reference..
>
>
>
>
>
> import os
>
>
>
> from pyflink.common import SimpleStringSchema
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.datastream.connectors import FlinkKafkaConsumer,
> FlinkKafkaProducer
>
>
>
>
>
> from pyflink.common import Types
>
>
>
>
>
> def main():
>
> env = StreamExecutionEnvironment.get_execution_environment()
>
> # the sql connector for kafka is used here as it's a fat jar and could
> avoid dependency issues
>
> env.set_parallelism(1)
>
> kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>  'flink-sql-connector-kafka_2.11-1.14.4.jar')
>
> env.add_jars("file:///{}".format(kafka_jar))
>
> deserialization_schema = SimpleStringSchema()
>
>
>
> # Test for kafka consumer
>
>
>
> kafka_consumer = FlinkKafkaConsumer(
>
> topics='test',
>
> deserialization_schema=deserialization_schema,
>
> properties={'bootstrap.servers': 'localhost:9093'})
>
>
>
>
>
> ds = env.add_source(kafka_consumer)
>
> #DATA USED IN KAFKA IS LIKE ('user1', 1, 2000)
>
> ds = ds.map(lambda x: eval(x))
>
> ds = ds.map(lambda x: ','.join([str(value) for value in x]))
>
>
>
> #ds.print()
>
>
>
>
>
> kafka_producer = FlinkKafkaProducer(
>
> topic='testresult',
>
> serialization_schema=SimpleStringSchema(),
>
> producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
> 'fraud_test'})
>
>
>
> ds.add_sink(kafka_producer)
>
> env.execute('main')
>
>
>
>
>
> if __name__ == '__main__':
>
> main()
>
>
>
> Thanks and Regards,
>
> Harshit
>
>
>
>
>
>
>


HTTP REST API as Ingress/Egress

2022-04-23 Thread Himanshu Sareen
Team,

Does flink-statefun support HTTP REST as Ingress ( like Kafka and kinesis )

I'm looking for a fault tolerant solution where an external API can invoke 
stateful function , access state and return response.

We are using python sdk for statefun application

Regards,
Himanshu



Re: Unit testing PyFlink SQL project

2022-04-23 Thread Dian Fu
Hi John,

>> I don't know how to fix this. I've tried adding `flink-table-planner`
and `flink-table-planner-blink` dependencies with `test-jar`
to my dummy pom.xml, but it still fails.
What's the failure after doing this? The flink-table-planner*-tests.jar
should be available in maven repository[1].

>> This is starting to feel like a real pain to do something that should be
trivial: basic TDD of a PyFlink project.  Is there a real-world example of
a Python project that shows how to set up a testing environment for unit
testing SQL with PyFlink?
I'm not aware of such a project, however I agree that this may be a very
important aspect which should be improved. I will look into this.

Regards,
Dian

[1]
https://repo1.maven.org/maven2/org/apache/flink/flink-table-planner_2.11/1.13.6/


On Sun, Apr 24, 2022 at 4:44 AM John Tipper  wrote:

> Hi all,
>
> Is there an example of a self-contained repository showing how to perform
> SQL unit testing of PyFlink (specifically 1.13.x if possible)?  I have
> cross-posted the question to Stack Overflow here:
> https://stackoverflow.com/questions/71983434/is-there-an-example-of-pyflink-sql-unit-testing-in-a-self-contained-repo
>
>
> There is a related SO question (
> https://stackoverflow.com/questions/69937520/pyflink-sql-local-test),
> where it is suggested to use some of the tests from PyFlink itself.  The
> issue I'm running into is that the PyFlink repo assumes that a bunch of
> things are on the Java classpath and that some Python utility classes are
> available (they're not distributed via PyPi apache-flink).
>
> I have done the following:
>
>
>1. Copied `test_case_utils.py` and `source_sink_utils.py` from PyFlink
>(
>
> https://github.com/apache/flink/tree/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing)
>into my project.
>2. Copy an example unit test (
>
> https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/table/tests/test_sql.py#L39)
>as suggested by the related SO question.
>3.
>
> When I try to run the test, I get an error because the test case cannot
> determine what version of Avro jars to download (`download_apache_avro()`
> fails, because pyflink_gateway_server.py tries to evaluate the value of
> `avro.version` by running `mvn help:evaluate -Dexpression=avro.version`)
>
> I then added a dummy `pom.xml` defining a Maven property of `avro.version`
> (with a value of `1.10.0`) and my unit test case is loaded.
>
> I now get a new error and my test is skipped:
>
> 'flink-table-planner*-tests.jar' is not available. Will skip the
> related tests.
>
> I don't know how to fix this. I've tried adding `flink-table-planner` and
> `flink-table-planner-blink` dependencies with `test-jar` to my
> dummy pom.xml, but it still fails.
>
> This is starting to feel like a real pain to do something that should be
> trivial: basic TDD of a PyFlink project.  Is there a real-world example of
> a Python project that shows how to set up a testing environment for unit
> testing SQL with PyFlink?
>
> Many thanks,
>
> John
>


Unit testing PyFlink SQL project

2022-04-23 Thread John Tipper
Hi all,

Is there an example of a self-contained repository showing how to perform SQL 
unit testing of PyFlink (specifically 1.13.x if possible)?  I have cross-posted 
the question to Stack Overflow here: 
https://stackoverflow.com/questions/71983434/is-there-an-example-of-pyflink-sql-unit-testing-in-a-self-contained-repo


There is a related SO question 
(https://stackoverflow.com/questions/69937520/pyflink-sql-local-test), where it 
is suggested to use some of the tests from PyFlink itself.  The issue I'm 
running into is that the PyFlink repo assumes that a bunch of things are on the 
Java classpath and that some Python utility classes are available (they're not 
distributed via PyPi apache-flink).

I have done the following:


  1.  Copied `test_case_utils.py` and `source_sink_utils.py` from PyFlink 
(https://github.com/apache/flink/tree/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing)
 into my project.
  2.  Copy an example unit test 
(https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/table/tests/test_sql.py#L39)
 as suggested by the related SO question.
  3.

When I try to run the test, I get an error because the test case cannot 
determine what version of Avro jars to download (`download_apache_avro()` 
fails, because pyflink_gateway_server.py tries to evaluate the value of 
`avro.version` by running `mvn help:evaluate -Dexpression=avro.version`)

I then added a dummy `pom.xml` defining a Maven property of `avro.version` 
(with a value of `1.10.0`) and my unit test case is loaded.

I now get a new error and my test is skipped:

'flink-table-planner*-tests.jar' is not available. Will skip the related 
tests.

I don't know how to fix this. I've tried adding `flink-table-planner` and 
`flink-table-planner-blink` dependencies with `test-jar` to my 
dummy pom.xml, but it still fails.

This is starting to feel like a real pain to do something that should be 
trivial: basic TDD of a PyFlink project.  Is there a real-world example of a 
Python project that shows how to set up a testing environment for unit testing 
SQL with PyFlink?

Many thanks,

John


Re: Savepoint and cancel questions

2022-04-23 Thread Dan Hill
Hi Hangxiang.  Thanks!
1. Ah, okay.  It makes more sense considering FAILED.
2. Oh cool.  I'm migrating to v1.14.4 now.
3. Yes, this is great!


On Fri, Apr 22, 2022 at 8:05 PM Hangxiang Yu  wrote:

> Hi, Dan
> 1. Do you mean put the option into savepoint command? If so, I think it
> will not work well. This option describe that how checkpoints will be
> cleaned up in different job status. e.g. FAILED/CANCELED. It cannot be
> covered in savepoint command.
> 2. Which flink version you use? I work on 1.14 and There is a confirmation
> dialog when I click "cancel job". You could double check it.
> 3. From 1.15 on, the restore mode is introduced to solve the problem as
> you could see in resuming-from-savepoints
> .
> I think the claim mode maybe what you want.
>
> On Sat, Apr 23, 2022 at 7:38 AM Dan Hill  wrote:
>
>> Hi.
>>
>> 1. Why isn’t –externalizedCheckpointCleanup an option on savepoint
>> (instead of being needed at the start of a job run)?
>>
>> 2. Can we get a confirmation dialog when someone clicks "cancel job" in
>> the UI?  Just in case people click on accident.
>>
>> 3. Can we get a way to have Flink clean up the previous
>> savepoint/retained checkpoint if a job successful checkpoints?
>>
>