Need help with executing Flink CLI for native Kubernetes deployment

2021-03-26 Thread Fuyao Li
Hi Community, Yang,

I am new to Flink on native Kubernetes and I am trying to do a POC for native 
Kubernetes application mode on Oracle Cloud Infrastructure. I was following the 
documentation here step by step: [1]

I am using Flink 1.12.1, Scala 2.11, java 11.
I was able to create a native Kubernetes Deployment, but I am not able to use 
any further commands like list / cancel etc.. I always run into timeout error. 
I think the issue could be the JobManager Web Interface IP address printed 
after job deployment is not accessible. This issue is causing me not able to 
shut down the deployment with a savepoint. It could be Kubernetes configuration 
issue. I have exposed all related ports traffic and validated the security 
list, but still couldn’t make it work. Any help is appreciated.


The relevant Flink source code is CliFrontend.java class [2]
The ./bin/flink list and cancel command is trying to send traffic to the Flink 
dashboard UI IP address and it gets timeout. I tried to both LoadBalancer and 
NodePort option for -Dkubernetes.rest-service.exposed.type configuration. Both 
of them doesn’t work.

# List running job on the cluster (I can’t execute this command successfully 
due to timeout, logs shared below)
$ ./bin/flink list --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster
# Cancel running job (I can’t execute this command succcessfully)
$ ./bin/flink cancel --target kubernetes-application 
-Dkubernetes.cluster-id=my-first-application-cluster 

I think those commands needs to communicate with the endpoint that shows after 
the the job submission command.


  1.  Use case 1(deploy with NodePort)

# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \

-Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1
 \
-Dkubernetes.container.image.pull-policy=IfNotPresent \
-Dkubernetes.container.image.pull-secrets=ocirsecret \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.service-account=flink-service-account \
local:///opt/flink/usrlib/quickstart-0.1.jar


When the expose type is NodePort, the printed messages says the the Flink  
JobManager Web Interface:is at http://192.29.104.156:30996  192.29.104.156 is 
my Kubernetes apiserver address. 30996 is the port that exposes the service. 
However, Flink dashboard in this address is not resolvable.
I can only get access to dashboard UI on each node IP address(There are three 
nodes in my K8S cluster)
100.104.154.73:30996
100.104.154.74:30996
100.104.154.75:30996
  I got the following errors when trying to do list command for such a 
native Kubernetes deployment. See in [4]. According to the documentation here 
[3], this shouldn’t happen since Kubernetes api server address should also have 
the Flink Web UI… Did I miss any configurations in Kubernetes to make webUI 
available in Kubernetes apiserver address?



  1.  Use case 2 (deploy with LoadBalancer)
# fuyli @ fuyli-mac in ~/Development/flink-1.12.1 [17:59:00] C:127
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \

-Dkubernetes.container.image=us-phoenix-1.ocir.io/idxglh0bz964/flink-demo:21.3.1
 \
-Dkubernetes.container.image.pull-policy=IfNotPresent \
-Dkubernetes.container.image.pull-secrets=ocirsecret \
-Dkubernetes.rest-service.exposed.type=LoadBalancer \
-Dkubernetes.service-account=flink-service-account \
local:///opt/flink/usrlib/quickstart-0.1.jar


After a while, when the external IP is resolved. It said Flink JobManager web 
interface is at the external-IP (LOAD BALANCER address) at: 
http://144.25.13.78:8081
When I execute the list command, I still got error after waiting for long time 
to let it get timeout. See errors here. [5]

I can still get access to NodeIP:. In such case, I tend to 
believe it is a network issue. But still quite confused since I am already open 
all the traffics..




Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[2] 
https://github.com/apache/flink/blob/f3155e6c0213de7bf4b58a89fb1e1331dee7701a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html#accessing-flinks-web-ui
[4] https://pastebin.ubuntu.com/p/WcJMwds52r/
[5] https://pastebin.ubuntu.com/p/m27BnQGXQc/


Thanks for your help in advance.

Best regards,
Fuyao




Re: How to visualize the results of Flink processing or aggregation?

2021-03-26 Thread David Anderson
Prometheus is a metrics system; you can use Flink's Prometheus metrics
reporter to send metrics to Prometheus.

Grafana can also be connected to influxdb, and to databases like mysql and
postgresql, for which sinks are available.

And the Elasticsearch sink can be used to create visualizations with Kibana.

I'm sure there are other solutions as well, but these are some of the
popular ones.

Regards,
David

On Fri, Mar 26, 2021 at 5:15 PM Xiong Qiang 
wrote:

> Hi All,
>
> I am new to Flink, so forgive me if it is a naive question.
>
> The context is:
> We have a data streaming coming in, and we will use Flink applications to
> do the processing or aggregations. After the processing or aggregation, we
> need some approaches to visualize the results, to either build a dashboard
> or setup alerts, for example, using Prometheus and Grafana.
> However, after reading the documents (
> https://flink.apache.org/flink-architecture.html and more links) and
> examples (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html)
> (
> https://github.com/ververica/flink-training/blob/master/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java)
> , *I am still not able to close the gap between Flink and a
> monitoring/dashboard tool, e.g. Prometheus/Grafana. *
>
> *The question is:*
> *How are processing results connected/sinked from Flink to
> Prometheus/Grafana? *for example, in the fraud detection example, how is
> the account id = 3, send to Prometheus and Grafana, so that I have a
> dashboard showing there is one suspected account? In the taxi long rides
> example, how do I send the count of long rides from Flink to
> Prometheus/Grafana?
>
> I understand there are sinks (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/).
> However, I didn't see sinks for Prometheus.
>
> Hope I made my question clear.
>
> Thanks
>


Restore from Checkpoint from local Standalone Job

2021-03-26 Thread Sandeep khanzode
Hello


I was reading this: 
https://stackoverflow.com/questions/61010970/flink-resume-from-externalised-checkpoint-question


I am trying to run a standalone job on my local with a single job manager and 
task manager. 



I have enabled checkpointing as below:
env.setStateBackend(new RocksDBStateBackend(“file:///Users/test/checkpoint", 
true));

env.enableCheckpointing(30 * 1000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60 * 1000);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


After I stop my job (I also tried to cancel the job using bin/flink cancel -s 
/Users/test/savepoint ), I tried to start the same job using…

./standalone-job.sh start-foreground test.jar --job-id   --job-classname 
com.test.MyClass --fromSavepoint /Users/test/savepoint


But it never restores the state, and always starts afresh. 


In Flink, I see this: 
StandaloneCompletedCheckpointStore
* {@link CompletedCheckpointStore} for JobManagers running in {@link 
HighAvailabilityMode#NONE}.
public void recover() throws Exception {
// Nothing to do
}

Does this have something to do with not being able to restore state?

Does this need Zookeeper or K8S HA for functioning?


Thanks,
Sandeep



Re: [External] : How to visualize the results of Flink processing or aggregation?

2021-03-26 Thread Fuyao Li
Hello Xiong,

You can expose monitors through Metric system of Flink.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
Metrics can be exposed by metric reporter:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/metric_reporters.html
That includes Prometheus.

For the DataStream API, you can build customized metrics.
For the Flink SQL/Table API, you can only use the listed predefined metrics in 
Flink. There could be work arounds ways, but no direct way to supply Flink SQL 
customized metrics.

Best regards,
Fuyao

From: Xiong Qiang 
Date: Friday, March 26, 2021 at 09:15
To: user@flink.apache.org 
Subject: [External] : How to visualize the results of Flink processing or 
aggregation?
Hi All,

I am new to Flink, so forgive me if it is a naive question.

The context is:
We have a data streaming coming in, and we will use Flink applications to do 
the processing or aggregations. After the processing or aggregation, we need 
some approaches to visualize the results, to either build a dashboard or setup 
alerts, for example, using Prometheus and Grafana.
However, after reading the documents 
(https://flink.apache.org/flink-architecture.html
 and more links) and examples 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html)
 
(https://github.com/ververica/flink-training/blob/master/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java)
 , I am still not able to close the gap between Flink and a 
monitoring/dashboard tool, e.g. Prometheus/Grafana.

The question is:
How are processing results connected/sinked from Flink to Prometheus/Grafana? 
for example, in the fraud detection example, how is the account id = 3, send to 
Prometheus and Grafana, so that I have a dashboard showing there is one 
suspected account? In the taxi long rides example, how do I send the count of 
long rides from Flink to Prometheus/Grafana?

I understand there are sinks 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/).
 However, I didn't see sinks for Prometheus.

Hope I made my question clear.

Thanks


How to visualize the results of Flink processing or aggregation?

2021-03-26 Thread Xiong Qiang
Hi All,

I am new to Flink, so forgive me if it is a naive question.

The context is:
We have a data streaming coming in, and we will use Flink applications to
do the processing or aggregations. After the processing or aggregation, we
need some approaches to visualize the results, to either build a dashboard
or setup alerts, for example, using Prometheus and Grafana.
However, after reading the documents (
https://flink.apache.org/flink-architecture.html and more links) and
examples (
https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/datastream_api.html)
(
https://github.com/ververica/flink-training/blob/master/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java)
, *I am still not able to close the gap between Flink and a
monitoring/dashboard tool, e.g. Prometheus/Grafana. *

*The question is:*
*How are processing results connected/sinked from Flink to
Prometheus/Grafana? *for example, in the fraud detection example, how is
the account id = 3, send to Prometheus and Grafana, so that I have a
dashboard showing there is one suspected account? In the taxi long rides
example, how do I send the count of long rides from Flink to
Prometheus/Grafana?

I understand there are sinks (
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/).
However, I didn't see sinks for Prometheus.

Hope I made my question clear.

Thanks


Re: [BULK]Re: [SURVEY] Remove Mesos support

2021-03-26 Thread Till Rohrmann
+1 for officially deprecating this component for the 1.13 release.

Cheers,
Till

On Thu, Mar 25, 2021 at 1:49 PM Konstantin Knauf  wrote:

> Hi Matthias,
>
> Thank you for following up on this. +1 to officially deprecate Mesos in
> the code and documentation, too. It will be confusing for users if this
> diverges from the roadmap.
>
> Cheers,
>
> Konstantin
>
> On Thu, Mar 25, 2021 at 12:23 PM Matthias Pohl 
> wrote:
>
>> Hi everyone,
>> considering the upcoming release of Flink 1.13, I wanted to revive the
>> discussion about the Mesos support ones more. Mesos is also already listed
>> as deprecated in Flink's overall roadmap [1]. Maybe, it's time to align
>> the
>> documentation accordingly to make it more explicit?
>>
>> What do you think?
>>
>> Best,
>> Matthias
>>
>> [1] https://flink.apache.org/roadmap.html#feature-radar
>>
>> On Wed, Oct 28, 2020 at 9:40 AM Till Rohrmann 
>> wrote:
>>
>> > Hi Oleksandr,
>> >
>> > yes you are right. The biggest problem is at the moment the lack of test
>> > coverage and thereby confidence to make changes. We have some e2e tests
>> > which you can find here [1]. These tests are, however, quite coarse
>> grained
>> > and are missing a lot of cases. One idea would be to add a Mesos e2e
>> test
>> > based on Flink's end-to-end test framework [2]. I think what needs to be
>> > done there is to add a Mesos resource and a way to submit jobs to a
>> Mesos
>> > cluster to write e2e tests.
>> >
>> > [1] https://github.com/apache/flink/tree/master/flink-jepsen
>> > [2]
>> >
>> https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-end-to-end-tests-common
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, Oct 27, 2020 at 12:29 PM Oleksandr Nitavskyi <
>> > o.nitavs...@criteo.com> wrote:
>> >
>> >> Hello Xintong,
>> >>
>> >> Thanks for the insights and support.
>> >>
>> >> Browsing the Mesos backlog and didn't identify anything critical, which
>> >> is left there.
>> >>
>> >> I see that there are were quite a lot of contributions to the Flink
>> Mesos
>> >> in the recent version:
>> >> https://github.com/apache/flink/commits/master/flink-mesos.
>> >> We plan to validate the current Flink master (or release 1.12 branch)
>> our
>> >> Mesos setup. In case of any issues, we will try to propose changes.
>> >> My feeling is that our test results shouldn't affect the Flink 1.12
>> >> release cycle. And if any potential commits will land into the 1.12.1
>> it
>> >> should be totally fine.
>> >>
>> >> In the future, we would be glad to help you guys with any
>> >> maintenance-related questions. One of the highest priorities around
>> this
>> >> component seems to be the development of the full e2e test.
>> >>
>> >> Kind Regards
>> >> Oleksandr Nitavskyi
>> >> 
>> >> From: Xintong Song 
>> >> Sent: Tuesday, October 27, 2020 7:14 AM
>> >> To: dev ; user 
>> >> Cc: Piyush Narang 
>> >> Subject: [BULK]Re: [SURVEY] Remove Mesos support
>> >>
>> >> Hi Piyush,
>> >>
>> >> Thanks a lot for sharing the information. It would be a great relief
>> that
>> >> you are good with Flink on Mesos as is.
>> >>
>> >> As for the jira issues, I believe the most essential ones should have
>> >> already been resolved. You may find some remaining open issues here
>> [1],
>> >> but not all of them are necessary if we decide to keep Flink on Mesos
>> as is.
>> >>
>> >> At the moment and in the short future, I think helps are mostly needed
>> on
>> >> testing the upcoming release 1.12 with Mesos use cases. The community
>> is
>> >> currently actively preparing the new release, and hopefully we could
>> come
>> >> up with a release candidate early next month. It would be greatly
>> >> appreciated if you fork as experienced Flink on Mesos users can help
>> with
>> >> verifying the release candidates.
>> >>
>> >>
>> >> Thank you~
>> >>
>> >> Xintong Song
>> >>
>> >> [1]
>> >>
>> https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open
>> >> <
>> >>
>> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17402%3Fjql%3Dproject%2520%253D%2520FLINK%2520AND%2520component%2520%253D%2520%2522Deployment%2520%252F%2520Mesos%2522%2520AND%2520status%2520%253D%2520Open=04%7C01%7Co.nitavskyi%40criteo.com%7C3585e1f25bdf4e091af808d87a3f92db%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637393760750820881%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=hytJFQE0MCPzMLiQTQTdbg3GVckX5M3r1NPRGrRV8j4%3D=0
>> >> >
>> >>
>> >> On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang > >> > wrote:
>> >>
>> >> Hi Xintong,
>> >>
>> >>
>> >>
>> >> Do you have any jiras that cover any of the items on 1 or 2? I can
>> reach
>> >> out to folks internally and see if I can get some folks to commit to
>> >> helping out.
>> >>
>> >>
>> >>
>> >> To cover the other qs:
>> >>
>> >>   *   

Re: State size increasing exponentially in Flink v1.9

2021-03-26 Thread Yun Tang
Hi,

If using RocksDB state backend, why it would occur `CopyOnWriteStateMap`?

CopyOnWriteStateMap should only exist in heap based state-backend.

Best
Yun Tang


From: Chesnay Schepler 
Sent: Friday, March 26, 2021 18:45
To: Almeida, Julius ; user@flink.apache.org 

Subject: Re: State size increasing exponentially in Flink v1.9

Could you show us how you interact with the map state (ideally the full code of 
your function that accesses the state)?

On 3/25/2021 1:13 AM, Almeida, Julius wrote:

Hey,

Hope you all are doing well!



I am using flink v1.9 with RocksDBStateBackend, but over time the state size is 
increasing exponentially.



I am using MapState in my project & seeing memory spike, after looking at heap 
dump I see duplicates in it.



I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)



Can anyone give me pointers to find more details on it.



Heap Dump pointed to 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811



Thanks,

Julius



Re: Question about checkpoints and savepoints

2021-03-26 Thread Robert Cullen
Here’s a snippet from the logs, there are no errors in the logs

2021-03-23 13:11:52,247 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

2021-03-23 13:11:52,249 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Preconfiguration:
2021-03-23 13:11:52,249 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

JM_RESOURCE_PARAMS extraction logs:
jvm_params: -Xmx2097152000 -Xms2097152000 -XX:MaxMetaspaceSize=268435456
logs: INFO  [] - Loading configuration property:
jobmanager.rpc.address, flink-jobmanager
INFO  [] - Loading configuration property: taskmanager.numberOfTaskSlots, 4
INFO  [] - Loading configuration property: blob.server.port, 6124
INFO  [] - Loading configuration property: jobmanager.rpc.port, 6123
INFO  [] - Loading configuration property: taskmanager.rpc.port, 6122
INFO  [] - Loading configuration property: queryable-state.proxy.ports, 6125
INFO  [] - Loading configuration property: jobmanager.memory.heap.size, 2000m
INFO  [] - Loading configuration property:
taskmanager.memory.task.heap.size, 2000m
INFO  [] - Loading configuration property:
taskmanager.memory.managed.size, 3000m
INFO  [] - Loading configuration property: parallelism.default, 2
INFO  [] - Loading configuration property: state.backend, filesystem
INFO  [] - Loading configuration property: state.checkpoints.dir,
s3://flink/checkpoints
INFO  [] - Loading configuration property: state.savepoints.dir,
s3://flink/savepoints
INFO  [] - Loading configuration property: s3.endpoint, http://cmdaa-minio:9000
INFO  [] - Loading configuration property: s3.path-style-access, true
INFO  [] - Loading configuration property: s3.path.style.access, true
INFO  [] - Loading configuration property: s3.access-key, cmdaa123
INFO  [] - Loading configuration property: s3.secret-key, **
INFO  [] - Final Master Memory configuration:
INFO  [] -   Total Process Memory: 2.587gb (2777561320 bytes)
INFO  [] - Total Flink Memory: 2.078gb (2231369728 bytes)
INFO  [] -   JVM Heap: 1.953gb (2097152000 bytes)
INFO  [] -   Off-heap: 128.000mb (134217728 bytes)
INFO  [] - JVM Metaspace:  256.000mb (268435456 bytes)
INFO  [] - JVM Overhead:   264.889mb (277756136 bytes)


On Fri, Mar 26, 2021 at 4:03 AM Robert Metzger  wrote:

> Hi,
>
> has the "state.savepoints.dir" configuration key the same value as
> "state.checkpoints.dir"?
> If not, can you post your configuration keys, and the invocation how you
> trigger a savepoint?
> Have you checked the logs? Maybe there's an error message?
>
> On Thu, Mar 25, 2021 at 7:17 PM Robert Cullen 
> wrote:
>
>> When I run a job on my Kubernetes session cluster only the checkpoint
>> directories are created but not the savepoints. (Filesystem configured to
>> S3 Minio)  Any ideas?
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490


Re: Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in the classpath

2021-03-26 Thread Chesnay Schepler

Please try copying the connector jar into the lib/ or opt/ directory.

On 3/26/2021 11:59 AM, Yik San Chan wrote:
This question is cross-posted on Stack Overflow 
https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca 
. 



I am running a PyFlink program that reads from Hive `mysource` table, 
does some processing, then writes to Hive `mysink` table.


```
hive (aiinfra)> describe mysource;
OK
a                   bigint
b                   bigint

hive (aiinfra)> describe mysink;
OK
c                   bigint
```

This is my tree.

```
.
├── deps
│   ├── flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar
├── hive.py
```

This is the `hive.py`.

```python
import os
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()

t_env = BatchTableEnvironment.create(environment_settings=settings)

t_env.get_config().get_configuration().set_string(
    "pipeline.jars", 
f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"

)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog("myhive", hive_catalog)

# set the HiveCatalog as the current catalog of the session
t_env.use_catalog("myhive")

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

The above program works fine, until I turn the catalog registration 
logic into SQL.


```python
import os
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()

t_env = BatchTableEnvironment.create(environment_settings=settings)

t_env.get_config().get_configuration().set_string(
    "pipeline.jars", 
f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"

)

CREATE_CATALOG_DDL = """
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'aiinfra',
    'hive-conf-dir' = '/data/apache/hive/apache-hive-2.1.0-bin/conf'
)
"""

USE_CATALOG_DDL = """
USE CATALOG myhive
"""

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
t_env.execute_sql(CREATE_CATALOG_DDL)
t_env.execute_sql(USE_CATALOG_DDL)

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

Running the latter version `python hive.py` throws exception:

```
Traceback (most recent call last):
  File 
"/data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py", 
line 42, in 

    t_env.execute_sql(SOURCE_DDL_1)
  File 
"/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", 
line 766, in execute_sql
  File 
"/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 
line 1286, in __call__
  File 
"/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
line 147, in deco
  File 
"/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", 
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
o10.executeSql.
: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.CatalogFactory' in

the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
default-database=aiinfra
hive-conf-dir=/data/apache/hive/apache-hive-2.1.0-bin/conf
type=hive

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1078)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:991)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)


Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in the classpath

2021-03-26 Thread Yik San Chan
This question is cross-posted on Stack Overflow
https://stackoverflow.com/questions/66815572/could-not-find-a-suitable-table-factory-for-org-apache-flink-table-factories-ca
.

I am running a PyFlink program that reads from Hive `mysource` table, does
some processing, then writes to Hive `mysink` table.

```
hive (aiinfra)> describe mysource;
OK
a   bigint
b   bigint

hive (aiinfra)> describe mysink;
OK
c   bigint
```

This is my tree.

```
.
├── deps
│   ├── flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar
├── hive.py
```

This is the `hive.py`.

```python
import os
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings =
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=settings)

t_env.get_config().get_configuration().set_string(
"pipeline.jars",
f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog("myhive", hive_catalog)

# set the HiveCatalog as the current catalog of the session
t_env.use_catalog("myhive")

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

The above program works fine, until I turn the catalog registration logic
into SQL.

```python
import os
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings =
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=settings)

t_env.get_config().get_configuration().set_string(
"pipeline.jars",
f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

CREATE_CATALOG_DDL = """
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'aiinfra',
'hive-conf-dir' = '/data/apache/hive/apache-hive-2.1.0-bin/conf'
)
"""

USE_CATALOG_DDL = """
USE CATALOG myhive
"""

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
t_env.execute_sql(CREATE_CATALOG_DDL)
t_env.execute_sql(USE_CATALOG_DDL)

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

Running the latter version `python hive.py` throws exception:

```
Traceback (most recent call last):
  File
"/data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py",
line 42, in 
t_env.execute_sql(SOURCE_DDL_1)
  File
"/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 766, in execute_sql
  File
"/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not
find a suitable table factory for
'org.apache.flink.table.factories.CatalogFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
default-database=aiinfra
hive-conf-dir=/data/apache/hive/apache-hive-2.1.0-bin/conf
type=hive

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1078)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:991)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 

Re: Fail to cancel perJob for that deregisterApplication is not called

2021-03-26 Thread Chesnay Schepler

Where exactly did you add your own log message?

WebMonitorEndpoint.closeAsync() already logs on it's own whether the 
shutdown future was completed; meaning that it shouldn't have been 
necessary to add a separate log message.
If you now only see the one you added, chances are that it was added at 
the wrong place.


On 3/24/2021 5:06 AM, 刘建刚 wrote:
      I am using flink 1.10.0. My perJob can not be cancelled. From 
the log I find that webMonitorEndpoint.closeAsync() is completed but 
deregisterApplication is not called. The related code is as follows:

public CompletableFuturederegisterApplicationAndClose(
   final ApplicationStatus applicationStatus, final @Nullable String 
diagnostics) {

if (isRunning.compareAndSet(true, false)) {
   final CompletableFuture closeWebMonitorAndDeregisterAppFuture =
  FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () ->
 deregisterApplication(applicationStatus, diagnostics, 
resourceManager.getJobId())); return 
FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, 
this::closeAsyncInternal); }else {
   return terminationFuture; }
}
      For webMonitorEndpoint.closeAsync(), the code is as follows:
public CompletableFuturecloseAsync() {
synchronized (lock) {
   log.info("State is {}. Shutting down rest endpoint.", state); if (state 
== State.RUNNING) {
  final CompletableFuture shutDownFuture = 
FutureUtils.composeAfterwards(
 closeHandlersAsync(), this::shutDownInternal); 
shutDownFuture.whenComplete(
 (Void ignored, Throwable throwable) -> {
log.info("Shut down complete."); if (throwable !=null) {
   terminationFuture.completeExceptionally(throwable); }else {
   terminationFuture.complete(null); }
 }); state = State.SHUTDOWN; }else if (state == State.CREATED) {
  terminationFuture.complete(null); state = State.SHUTDOWN; }

   return terminationFuture; }
}
      I am sure that it is completed with the log I added as follows:
image.png
     For deregisterApplication, I do not see any related log like 
"Shut down cluster because application is in {}, diagnostics {}.".

      Can anyone give me some suggestions? Thank you.





Re: State size increasing exponentially in Flink v1.9

2021-03-26 Thread Chesnay Schepler
Could you show us how you interact with the map state (ideally the full 
code of your function that accesses the state)?


On 3/25/2021 1:13 AM, Almeida, Julius wrote:


Hey,

Hope you all are doing well!

I am using flink v1.9 with RocksDBStateBackend, but over time the 
state size is increasing exponentially.


I am using MapState in my project & seeing memory spike, after looking 
at heap dump I see duplicates in it.


I also have logic added to remove expired events form the MapState

Eg.: MapState.remove(key)

Can anyone give me pointers to find more details on it.

Heap Dump pointed to 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateMap.java#L811 



Thanks,

Julius





Re: Flink SQL client: SELECT 'hello world' throws [ERROR] Could not execute SQL statement

2021-03-26 Thread Yik San Chan
Thank you for the information.

On Fri, Mar 26, 2021 at 5:14 PM David Anderson  wrote:

> There needs to be a Flink session cluster available to the SQL client on
> which it can run the jobs created by your queries. See the Getting Started
> [1] section of the SQL Client documentation for more information:
>
> The SQL Client is bundled in the regular Flink distribution and thus
> runnable out-of-the-box. It requires only a running Flink cluster where
> table programs can be executed. For more information about setting up a
> Flink cluster see the Cluster & Deployment part [2]. If you simply want to
> try out the SQL Client, you can also start a local cluster with one worker
> using the following command:
>
> ./bin/start-cluster.sh
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html#getting-started
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone
>
> Regards,
> David
>
> On Fri, Mar 26, 2021 at 9:50 AM Yik San Chan 
> wrote:
>
>> The question is cross-posted in Stack Overflow
>> https://stackoverflow.com/questions/66813644/flink-sql-client-select-hello-world-throws-error-could-not-execute-sql-stat
>> .
>>
>> I am following Flink SQL client docs
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html#dependencies.
>> Opening the Flink SQL client looks fine, but `SELECT 'hello world'` doesn't
>> work.
>>
>> ```
>> $ /data/apache/flink/flink-1.12.0/bin/sql-client.sh embedded
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/data/apache/flink/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> No default environment specified.
>> Searching for
>> '/data/apache/flink/flink-1.12.0/conf/sql-client-defaults.yaml'...found.
>> Reading default environment from:
>> file:/data/apache/flink/flink-1.12.0/conf/sql-client-defaults.yaml
>> No session environment specified.
>>
>> Command history file path: /data/home/pal-flink/.flink-sql-history
>>▒▓██▓██▒
>>▓▒▒█▓▒▓███▓▒
>> ▓███▓░░▒▒▒▓██▒  ▒
>>   ░██▒   ▒▒▓▓█▓▓▒░  ▒
>>   ██▒ ░▒▓███▒▒█▒█▒
>> ░▓████   ▓░▒██
>>   ▓█   ▒▓██▓░▒░▓▓█
>> █░ █   ▒▒░   ███▓▓█ ▒█▒▒▒
>> ░   ▒▓█▓  ██▒▒▒ ▓███▒
>>  ░▒█▓▓██   ▓█▒▓█▒▓██▓ ░█░
>>▓░▒▓▒ ██ ▒██▓░▒█▒░▒█▒
>>   ███▓░██▓  ▓█   █   █▓ ▒▓█▓▓█▒
>> ░██▓  ░█░█  █▒ ▒█▓▒ ██▓░▒
>>███░ ░ █░  ▓ ░█ █▒░░░█░▓  ▓░
>>   ██▓█ ▒▒▓▒  ▓███▓░   ▒█▒ ▒▓ ▓██▓
>>▒██▓ ▓█ █▓█   ░▒█▓▓▒░ ██▒▒  █ ▒  ▓█▒
>>▓█▓  ▓█ ██▓ ░▓▓▓▒  ▒██▓   ░█▒
>>▓██ ▓███▓▒░  ░▓▓▓███▓  ░▒░ ▓█
>>██▓██▒░▒▓▓███▓██▓▒▓███  █
>>   ▓███▒ ███   ░▓▓▒░░   ░▓▓░  ░▒▓▒  █▓
>>   █▓▒▒▓▓██  ░▒▒░░░▓██▓░█▓
>>   ██ ▓░▒█   ▒░░  ▒█▓   ▒▓▓██▓▓▒  ▒▒▓
>>   ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒░▓█▒   ▒▒▒░▒▒▓█▒
>>██░ ▓█▒█▒  ▒▓▓▒  ▓██░     ░█▒
>>▓█   ▒█▓   ░ █░▒█  █▓
>> █▓   ██ █░ ▓▓▒█▓▓▓▒█░
>>  █▓ ░▓██░   ▓▒  ▓█▓▒░░░▒▓█░▒█
>>   ██   ▓█▓░  ▒░▒█▒██▒  ▓▓
>>▓█▒   ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
>> ░██▒▒▓▓▒ ▓██▓▒█▒ ░▒█▓
>>   ░▓██▒  ▓░  ▒█▓█  ░░▒▒▒
>>   ▒▓▒▒▒░░▓▓  ▓░▒█░
>>
>> __ _ _   _   _    _ _ _ _
>>  _  BETA
>>|  | (_) | | / |/ __ \| |   / | (_)  |
>> |
>>| |__  | |_ _ __ | | __ | (___ | |  | | |  | || |_  ___ _ __ |
>> |_
>>|  __| | | | '_ \| |/ /  \___ \| |  | | |  | || | |/ _ \ '_ \|
>> __|
>>| || | | | | |   <   ) | |__| | |  | || | |  __/ | | |
>> |_
>>|_||_|_|_| |_|_|\_\ |_/ \___\_\__|  \_|_|_|\___|_|
>> |_|\__|
>>
>> Welcome! Enter 'HELP;' to list all available 

Re: Hadoop is not in the classpath/dependencies

2021-03-26 Thread Robert Metzger
Hey Matthias,

Maybe the classpath contains hadoop libraries, but not the HDFS libraries?
The "DistributedFileSystem" class needs to be accessible to the
classloader. Can you check if that class is available?

Best,
Robert

On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler <
matthias.sei...@campus.tu-berlin.de> wrote:

> Hello everybody,
>
> I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two machines.
> The job should store the checkpoints on HDFS like so:
> ```java
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
> env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
> ```
>
> Unfortunately, the JobManager throws
> ```
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'hdfs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded. For a full list of supported file systems,
> please see
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
> // ...
> Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
> not in the classpath/dependencies.
> ```
> and I don't understand why.
>
> `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
> wildcards. Flink's JobManger prints the classpath which includes
> specific packages from these Hadoop libraries. Besides that, Flink
> creates the state directories on HDFS, but no content.
>
> Thank you for any advice,
> Matthias
>
>


Re: Flink SQL client: SELECT 'hello world' throws [ERROR] Could not execute SQL statement

2021-03-26 Thread David Anderson
There needs to be a Flink session cluster available to the SQL client on
which it can run the jobs created by your queries. See the Getting Started
[1] section of the SQL Client documentation for more information:

The SQL Client is bundled in the regular Flink distribution and thus
runnable out-of-the-box. It requires only a running Flink cluster where
table programs can be executed. For more information about setting up a
Flink cluster see the Cluster & Deployment part [2]. If you simply want to
try out the SQL Client, you can also start a local cluster with one worker
using the following command:

./bin/start-cluster.sh


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html#getting-started
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone

Regards,
David

On Fri, Mar 26, 2021 at 9:50 AM Yik San Chan 
wrote:

> The question is cross-posted in Stack Overflow
> https://stackoverflow.com/questions/66813644/flink-sql-client-select-hello-world-throws-error-could-not-execute-sql-stat
> .
>
> I am following Flink SQL client docs
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html#dependencies.
> Opening the Flink SQL client looks fine, but `SELECT 'hello world'` doesn't
> work.
>
> ```
> $ /data/apache/flink/flink-1.12.0/bin/sql-client.sh embedded
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/data/apache/flink/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> No default environment specified.
> Searching for
> '/data/apache/flink/flink-1.12.0/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/data/apache/flink/flink-1.12.0/conf/sql-client-defaults.yaml
> No session environment specified.
>
> Command history file path: /data/home/pal-flink/.flink-sql-history
>▒▓██▓██▒
>▓▒▒█▓▒▓███▓▒
> ▓███▓░░▒▒▒▓██▒  ▒
>   ░██▒   ▒▒▓▓█▓▓▒░  ▒
>   ██▒ ░▒▓███▒▒█▒█▒
> ░▓████   ▓░▒██
>   ▓█   ▒▓██▓░▒░▓▓█
> █░ █   ▒▒░   ███▓▓█ ▒█▒▒▒
> ░   ▒▓█▓  ██▒▒▒ ▓███▒
>  ░▒█▓▓██   ▓█▒▓█▒▓██▓ ░█░
>▓░▒▓▒ ██ ▒██▓░▒█▒░▒█▒
>   ███▓░██▓  ▓█   █   █▓ ▒▓█▓▓█▒
> ░██▓  ░█░█  █▒ ▒█▓▒ ██▓░▒
>███░ ░ █░  ▓ ░█ █▒░░░█░▓  ▓░
>   ██▓█ ▒▒▓▒  ▓███▓░   ▒█▒ ▒▓ ▓██▓
>▒██▓ ▓█ █▓█   ░▒█▓▓▒░ ██▒▒  █ ▒  ▓█▒
>▓█▓  ▓█ ██▓ ░▓▓▓▒  ▒██▓   ░█▒
>▓██ ▓███▓▒░  ░▓▓▓███▓  ░▒░ ▓█
>██▓██▒░▒▓▓███▓██▓▒▓███  █
>   ▓███▒ ███   ░▓▓▒░░   ░▓▓░  ░▒▓▒  █▓
>   █▓▒▒▓▓██  ░▒▒░░░▓██▓░█▓
>   ██ ▓░▒█   ▒░░  ▒█▓   ▒▓▓██▓▓▒  ▒▒▓
>   ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒░▓█▒   ▒▒▒░▒▒▓█▒
>██░ ▓█▒█▒  ▒▓▓▒  ▓██░     ░█▒
>▓█   ▒█▓   ░ █░▒█  █▓
> █▓   ██ █░ ▓▓▒█▓▓▓▒█░
>  █▓ ░▓██░   ▓▒  ▓█▓▒░░░▒▓█░▒█
>   ██   ▓█▓░  ▒░▒█▒██▒  ▓▓
>▓█▒   ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
> ░██▒▒▓▓▒ ▓██▓▒█▒ ░▒█▓
>   ░▓██▒  ▓░  ▒█▓█  ░░▒▒▒
>   ▒▓▒▒▒░░▓▓  ▓░▒█░
>
> __ _ _   _   _    _ _ _ __
>  BETA
>|  | (_) | | / |/ __ \| |   / | (_)  | |
>| |__  | |_ _ __ | | __ | (___ | |  | | |  | || |_  ___ _ __ |
> |_
>|  __| | | | '_ \| |/ /  \___ \| |  | | |  | || | |/ _ \ '_ \|
> __|
>| || | | | | |   <   ) | |__| | |  | || | |  __/ | | |
> |_
>|_||_|_|_| |_|_|\_\ |_/ \___\_\__|  \_|_|_|\___|_|
> |_|\__|
>
> Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to
> exit.
>
>
> Flink SQL> SELECT 'Hello World';
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.

Re: reading from jdbc connection

2021-03-26 Thread Robert Metzger
Hey Arran,

It seems that the preferred way, even in the Java API is to use a DDL
statement:
https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java#L602-L639


Hope this helps!

Best,
Robert

On Thu, Mar 25, 2021 at 2:40 PM Arran Duff  wrote:

> Hi,
>
> I'm quite new to flink and I'm trying to create an application, which
> reads ID's from a kinesis stream and then uses these to read from a mysql
> database. I expect that I would just be doing a join of the id's onto the
> table
>
> I'm struggling to understand from the documentation how to actually
> connect to jdbc from flink using java.  For example - the code shown here
> 
> doesn't give any information about what to provide as arguments in the
> connect method. Reading the javadoc I can see that it needs to be a
> ConnectorDescriptor
> 
> object. And I can see that the known subclasses include a HBase, Kafka and
> Hbase connector. But I don't see one for JDBC. Should I just be using the
> CustomConnectorDescriptor
> 
>  and
> adding JDBC connection options? Will this work out of the box or am I going
> down a rabbit hole?
>
> I also note that all of the examples that I see for the jdbc connector are
> written in SQL, or DDL or yaml - for example here
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connect.html#jdbc-connector
> I'm not quite sure how I would go about getting that working in java. Any
> help would be greatly appreciated
>
> Thanks,
> Arran
>


Flink SQL client: SELECT 'hello world' throws [ERROR] Could not execute SQL statement

2021-03-26 Thread Yik San Chan
The question is cross-posted in Stack Overflow
https://stackoverflow.com/questions/66813644/flink-sql-client-select-hello-world-throws-error-could-not-execute-sql-stat
.

I am following Flink SQL client docs
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html#dependencies.
Opening the Flink SQL client looks fine, but `SELECT 'hello world'` doesn't
work.

```
$ /data/apache/flink/flink-1.12.0/bin/sql-client.sh embedded
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/apache/flink/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[org.apache.logging.slf4j.Log4jLoggerFactory]
No default environment specified.
Searching for
'/data/apache/flink/flink-1.12.0/conf/sql-client-defaults.yaml'...found.
Reading default environment from:
file:/data/apache/flink/flink-1.12.0/conf/sql-client-defaults.yaml
No session environment specified.

Command history file path: /data/home/pal-flink/.flink-sql-history
   ▒▓██▓██▒
   ▓▒▒█▓▒▓███▓▒
▓███▓░░▒▒▒▓██▒  ▒
  ░██▒   ▒▒▓▓█▓▓▒░  ▒
  ██▒ ░▒▓███▒▒█▒█▒
░▓████   ▓░▒██
  ▓█   ▒▓██▓░▒░▓▓█
█░ █   ▒▒░   ███▓▓█ ▒█▒▒▒
░   ▒▓█▓  ██▒▒▒ ▓███▒
 ░▒█▓▓██   ▓█▒▓█▒▓██▓ ░█░
   ▓░▒▓▒ ██ ▒██▓░▒█▒░▒█▒
  ███▓░██▓  ▓█   █   █▓ ▒▓█▓▓█▒
░██▓  ░█░█  █▒ ▒█▓▒ ██▓░▒
   ███░ ░ █░  ▓ ░█ █▒░░░█░▓  ▓░
  ██▓█ ▒▒▓▒  ▓███▓░   ▒█▒ ▒▓ ▓██▓
   ▒██▓ ▓█ █▓█   ░▒█▓▓▒░ ██▒▒  █ ▒  ▓█▒
   ▓█▓  ▓█ ██▓ ░▓▓▓▒  ▒██▓   ░█▒
   ▓██ ▓███▓▒░  ░▓▓▓███▓  ░▒░ ▓█
   ██▓██▒░▒▓▓███▓██▓▒▓███  █
  ▓███▒ ███   ░▓▓▒░░   ░▓▓░  ░▒▓▒  █▓
  █▓▒▒▓▓██  ░▒▒░░░▓██▓░█▓
  ██ ▓░▒█   ▒░░  ▒█▓   ▒▓▓██▓▓▒  ▒▒▓
  ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒░▓█▒   ▒▒▒░▒▒▓█▒
   ██░ ▓█▒█▒  ▒▓▓▒  ▓██░     ░█▒
   ▓█   ▒█▓   ░ █░▒█  █▓
█▓   ██ █░ ▓▓▒█▓▓▓▒█░
 █▓ ░▓██░   ▓▒  ▓█▓▒░░░▒▓█░▒█
  ██   ▓█▓░  ▒░▒█▒██▒  ▓▓
   ▓█▒   ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒▒▓▓▒ ▓██▓▒█▒ ░▒█▓
  ░▓██▒  ▓░  ▒█▓█  ░░▒▒▒
  ▒▓▒▒▒░░▓▓  ▓░▒█░

__ _ _   _   _    _ _ _ __
 BETA
   |  | (_) | | / |/ __ \| |   / | (_)  | |
   | |__  | |_ _ __ | | __ | (___ | |  | | |  | || |_  ___ _ __ | |_
   |  __| | | | '_ \| |/ /  \___ \| |  | | |  | || | |/ _ \ '_ \|
__|
   | || | | | | |   <   ) | |__| | |  | || | |  __/ | | | |_
   |_||_|_|_| |_|_|\_\ |_/ \___\_\__|  \_|_|_|\___|_|
|_|\__|

Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to
exit.


Flink SQL> SELECT 'Hello World';
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
JobGraph.
```

Any help? Thanks.

Best,
Yik San


Re: FlinkKafkaConsumer - Broadcast - Initial Load

2021-03-26 Thread Robert Metzger
Hey Sandeep,

(Maybe this thread is also relevant:
https://lists.apache.org/thread.html/7d56267d4c2344ccb5a774896682d0a3efb38c1c215ef3500c3569a2%40%3Cuser.flink.apache.org%3E
)

> My question is how do I initialise the pipeline for the first set of
records in the database? i.e. those that are not CDC?

I would use the open() method of your process function to load the initial
data in all operator instances.

>  I am assuming that the state backend would have the latest records and
even if the task manager crashes and restarts, we will have the correct
Kafka consumer group topic offsets recorded so that the next time, we do
not “startFromEarliest”? Is that right?

Correct. The consumer offsets are stored in Flink state, and checkpointed
with all the other state. On failure, the latest stable checkpoint is
restored (which includes the Kafka offsets, where we start processing from).



On Thu, Mar 25, 2021 at 7:53 PM Sandeep khanzode 
wrote:

> Hi,
>
> I have a master/reference data that needs to come in through a
> FlinkKafkaConsumer to be broadcast to all nodes and subsequently joined
> with the actual stream for enriching content.
>
> The Kafka consumer gets CDC-type records from database changes. All this
> works well.
>
>
> My question is how do I initialise the pipeline for the first set of
> records in the database? i.e. those that are not CDC?
>
> When I deploy for the first time, I would need all the DB records to be
> sent to the FlinkKafkaConsumer before any CDC updates happen.
>
> Is there a hook that allows for the first time initial load of the records
> in the Kafka topic to be broadcast?
>
>
>
> Also, about the broadcast state, since we are persisting the state in
> RocksDB backend, I am assuming that the state backend would have the latest
> records and even if the task manager crashes and restarts, we will have the
> correct Kafka consumer group topic offsets recorded so that the next time,
> we do not “startFromEarliest”? Is that right?
>
> Will the state always maintain the updates to the records as well as the
> Kafka topic offsets?
>
>
> Thanks,
> Sandeep


flink 1.11 cp size越来越大

2021-03-26 Thread liangji
读取kafka数据写入mysql
1、部分代码如下

 
2、从cp图中看到cp size越来越大,7天的时间从400m增加到了快2g

 
下面是一次cp的详细数据,两次window的过程中cp size很大

 
3、近7天的kafka消息量如下

 

请问大佬们,cp size为啥会越来越大?或者有什么排查思路吗?




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Question about checkpoints and savepoints

2021-03-26 Thread Robert Metzger
Hi,

has the "state.savepoints.dir" configuration key the same value as
"state.checkpoints.dir"?
If not, can you post your configuration keys, and the invocation how you
trigger a savepoint?
Have you checked the logs? Maybe there's an error message?

On Thu, Mar 25, 2021 at 7:17 PM Robert Cullen  wrote:

> When I run a job on my Kubernetes session cluster only the checkpoint
> directories are created but not the savepoints. (Filesystem configured to
> S3 Minio)  Any ideas?
>
> --
> Robert Cullen
> 240-475-4490
>


Re: Flink on Minikube

2021-03-26 Thread Robert Metzger
Hey Sandeep,

here's a project I've recently worked on, that deploys Flink on Minikube:
https://github.com/rmetzger/flink-reactive-mode-k8s-demo
The project is pretty big, but I guess you can pick the bits related to the
Flink deployment on minikube.

On Thu, Mar 25, 2021 at 7:48 PM Sandeep khanzode 
wrote:

> Hi Arvid,
>
> Thanks, will set the scope to Provided and try.
>
> Are there public examples in GitHub that demonstrate a sample app in
> Minikube?
>
> Sandeep
>
> On 23-Mar-2021, at 3:17 PM, Arvid Heise  wrote:
>
> Hi Sandeep,
>
> please have a look at [1], you should add most Flink dependencies as
> provided - exceptions are connectors (or in general stuff that is not in
> flink/lib/ or flink/plugins).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#setting-up-a-project-basic-dependencies
>
> On Tue, Mar 23, 2021 at 5:28 AM Sandeep khanzode 
> wrote:
>
>> Hi Arvid,
>>
>> I copy the JAR to the usrlib folder. This works in the Cloud EKS cluster.
>> I wanted to set this up for my testing purposes.
>>
>> Below is the Dockerfile:
>>
>> FROM apache/flink:1.12.1-java11
>> RUN mv /opt/flink/opt/flink-queryable-state-runtime_2.12-1.12.1.jar 
>> /opt/flink/lib/flink-queryable-state-runtime_2.12-1.12.1.jar
>> ADD myJar.jar /opt/flink/usrlib/myJar.jar
>>
>>
>> … But, in my process, this is a Fat JAR created by the Maven Shade
>> Plugin. Are you saying that all Flink classes should not be part of the
>> user JAR? How does that work? Do we set the scope of the dependencies to
>> compile (or, not runtime) for Flink Jars? Do we have any samples/examples
>> that shows this? Would be really helpful.
>>
>>
>> On 22-Mar-2021, at 8:00 PM, Arvid Heise  wrote:
>>
>> Hi Sandeep,
>>
>> The first error definitively indicates a classloading issue, which may
>> also be the cause for the second error.
>>
>> Can you describe where you put your jar inside the docker image and which
>> execution mode you are using? As a general rule, the jar is not supposed to
>> go into flink/lib.
>> Also make sure to never shade non-connector classes of Flink into your
>> jar. A typical user jar should be ~1MB.
>>
>> On Fri, Mar 19, 2021 at 8:58 PM Sandeep khanzode 
>> wrote:
>>
>>> Hello,
>>>
>>> I have a fat JAR compiled using the Man Shade plugin and everything
>>>  works correctly when I deploy it on a standalone local cluster i.e. one
>>> job and one task manager node.
>>>
>>> But I installed Minikube and the same JAR file packaged into a docker
>>> image fails with weird serialization  errors:
>>>
>>> Caused by: java.lang.ClassCastException: cannot assign instance of
>>> java.lang.invoke.SerializedLambda to field
>>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector
>>> of type org.apache.flink.api.java.functions.KeySelector in instance of
>>> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner
>>>
>>>
>>> … or in certain cases, if I comment out everything except the Kafka
>>> Source, then ...
>>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>>> org.apache.kafka.common.requests.MetadataRequest$Builder
>>>
>>>
>>> Is there anything I am missing with the Minikube setup? I initially
>>> tried with the steps for the Job Application cluster on the website, but I
>>> was unable to get the /usrlib mounted from the hostpath.
>>>
>>>
>>> So, I created a simple docker image from ...
>>>
>>> apache/flink:1.12.1-java11
>>>
>>>
>>> But I have not had any success getting the same job to run here. Please
>>> let me know if there are well-known steps or issues that I can check.
>>>
>>> Thanks,
>>> Sandeep
>>>
>>
>>
>


Re: Flink job repeated restart failure

2021-03-26 Thread Arvid Heise
Hi Vinaya,

java.io.tmpdir is already the fallback and I'm not aware of another level
of fallback.

Ensuring java.io.tmpdir is valid is also relevant for some third-party
libraries that rely on it (e.g. FileSystem that cache local files). It's
good practice to set that appropriately.

On Fri, Mar 26, 2021 at 6:32 AM vinaya  wrote:

> Hi Arvid,
>
> Thank you for the suggestion.
>
> Indeed, the specified setting was commented out in the Flink configuration
> (flink-conf.yaml).
>
>   # io.tmp.dirs: /tmp
>
> Is there a fallback (e.g. /tmp) if io.tmp.dirs and
> System.getProperty("java.io.tmpdir") are both not set?
>
> Will configure this setting to a valid value as suggested.
>
> Kind regards,
> Vinaya
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: pipeline.auto-watermark-interval vs setAutoWatermarkInterval

2021-03-26 Thread Matthias Pohl
Thanks for double-checking Dawid and thanks for clarifying, Jark. I will
leave the Jira issue open as Jark suggested improving the documentation in
that sense.

Best,
Matthias

On Fri, Mar 26, 2021 at 7:43 AM Jark Wu  wrote:

> IIUC, pipeline.auto-watermak-interval = 0 just disable **periodic**
> watermark emission,
>  it doesn't mean the watermark will never be emitted.
> In Table API/SQL, it has the same meaning. If watermark interval = 0, we
> disable periodic watermark emission,
> and emit watermark once it advances.
>
> So I think the SQL documentation is correct.
>
> Best,
> Jark
>
> On Tue, 23 Mar 2021 at 22:29, Dawid Wysakowicz 
> wrote:
>
>> Hey,
>>
>> I would like to double check this with Jark and/or Timo. As far as
>> DataStream is concerned the javadoc is correct. Moreover the
>> pipeline.auto-watermak-interval and setAutoWatermarkInterval are
>> effectively the same setting/option. However I am not sure if Table API
>> interprets it in the same way as DataStream APi. The documentation you
>> linked, Aeden, describes the SQL API.
>>
>> @Jark @Timo Could you verify if the SQL documentation is correct?
>>
>> Best,
>>
>> Dawid
>> On 23/03/2021 15:20, Matthias Pohl wrote:
>>
>> Hi Aeden,
>> sorry for the late reply. I looked through the code and verified that the
>> JavaDoc is correct. Setting pipeline.auto-watermark-interval to 0 will
>> disable the automatic watermark generation. I created FLINK-21931 [1] to
>> cover this.
>>
>> Thanks,
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-21931
>>
>> On Thu, Mar 4, 2021 at 9:53 PM Aeden Jameson 
>> wrote:
>>
>>> Correction: The first link was supposed to be,
>>>
>>> 1. pipeline.auto-watermark-interval
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval
>>>
>>> On Wed, Mar 3, 2021 at 7:46 PM Aeden Jameson 
>>> wrote:
>>> >
>>> > I'm hoping to have my confusion clarified regarding the settings,
>>> >
>>> > 1. pipeline.auto-watermark-interval
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
>>> >
>>> > 2. setAutoWatermarkInterval
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
>>> >
>>> > I noticed the default value of pipeline.auto-watermark-interval is 0
>>> > and according to these docs,
>>> >
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark
>>> ,
>>> > it states, "If watermark interval is 0ms, the generated watermarks
>>> > will be emitted per-record if it is not null and greater than the last
>>> > emitted one." However in the documentation for
>>> > setAutoWatermarkInterval the value 0 disables watermark emission.
>>> >
>>> > * Are they intended to be the same setting? If not how are they
>>> > different? Is one for FlinkSql and the other DataStream API?
>>> >
>>> > --
>>> > Thank you,
>>> > Aeden
>>
>>


Re: flink sql count distonct 优化

2021-03-26 Thread Jark Wu
> 如果不是window agg,开启参数后flink会自动打散是吧
是的

> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation

On Fri, 26 Mar 2021 at 11:00, guomuhua <663021...@qq.com> wrote:

> Jark wrote
> > 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> > agg支持这个参数了。可以期待下。
> >
> > Best,
> > Jark
> >
> > On Wed, 24 Mar 2021 at 19:29, Robin Zhang 
>
> > vincent2015qdlg@
>
> > 
> > wrote:
> >
> >> Hi,guomuhua
> >>   开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
> >>
> >> Best,
> >> Robin
> >>
> >>
> >> guomuhua wrote
> >> > 在SQL中,如果开启了 local-global 参数:set
> >> > table.optimizer.agg-phase-strategy=TWO_PHASE;
> >> > 或者开启了Partial-Final 参数:set
> >> table.optimizer.distinct-agg.split.enabled=true;
> >> >  set
> >> > table.optimizer.distinct-agg.split.bucket-num=1024;
> >> > 还需要对应的将SQL改写为两段式吗?
> >> > 例如:
> >> > 原SQL:
> >> > SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,
> >> >
> >> > 对所需DISTINCT字段buy_id模1024自动打散后,SQL:
> >> > SELECT day, SUM(cnt) total
> >> > FROM (
> >> > SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
> >> > FROM T GROUP BY day, MOD(buy_id, 1024))
> >> > GROUP BY day
> >> >
> >> > 还是flink会帮我自动改写SQL,我不用关心?
> >> >
> >> > 另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子
> >> > 
> >>
> http://apache-flink.147419.n8.nabble.com/file/t1346/%E7%AE%97%E5%AD%90.png
> ;
> >>
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
> 感谢,如果不是window agg,开启参数后flink会自动打散是吧。那关于window agg,
> 不能自动打散,这部分的介绍,在文档中可以找到吗?具体在哪里呢?还是需要从源码里找呢?望指教。再次感谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: pipeline.auto-watermark-interval vs setAutoWatermarkInterval

2021-03-26 Thread Jark Wu
IIUC, pipeline.auto-watermak-interval = 0 just disable **periodic**
watermark emission,
 it doesn't mean the watermark will never be emitted.
In Table API/SQL, it has the same meaning. If watermark interval = 0, we
disable periodic watermark emission,
and emit watermark once it advances.

So I think the SQL documentation is correct.

Best,
Jark

On Tue, 23 Mar 2021 at 22:29, Dawid Wysakowicz 
wrote:

> Hey,
>
> I would like to double check this with Jark and/or Timo. As far as
> DataStream is concerned the javadoc is correct. Moreover the
> pipeline.auto-watermak-interval and setAutoWatermarkInterval are
> effectively the same setting/option. However I am not sure if Table API
> interprets it in the same way as DataStream APi. The documentation you
> linked, Aeden, describes the SQL API.
>
> @Jark @Timo Could you verify if the SQL documentation is correct?
>
> Best,
>
> Dawid
> On 23/03/2021 15:20, Matthias Pohl wrote:
>
> Hi Aeden,
> sorry for the late reply. I looked through the code and verified that the
> JavaDoc is correct. Setting pipeline.auto-watermark-interval to 0 will
> disable the automatic watermark generation. I created FLINK-21931 [1] to
> cover this.
>
> Thanks,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-21931
>
> On Thu, Mar 4, 2021 at 9:53 PM Aeden Jameson 
> wrote:
>
>> Correction: The first link was supposed to be,
>>
>> 1. pipeline.auto-watermark-interval
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval
>>
>> On Wed, Mar 3, 2021 at 7:46 PM Aeden Jameson 
>> wrote:
>> >
>> > I'm hoping to have my confusion clarified regarding the settings,
>> >
>> > 1. pipeline.auto-watermark-interval
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
>> >
>> > 2. setAutoWatermarkInterval
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
>> >
>> > I noticed the default value of pipeline.auto-watermark-interval is 0
>> > and according to these docs,
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark
>> ,
>> > it states, "If watermark interval is 0ms, the generated watermarks
>> > will be emitted per-record if it is not null and greater than the last
>> > emitted one." However in the documentation for
>> > setAutoWatermarkInterval the value 0 disables watermark emission.
>> >
>> > * Are they intended to be the same setting? If not how are they
>> > different? Is one for FlinkSql and the other DataStream API?
>> >
>> > --
>> > Thank you,
>> > Aeden
>
>