Flink Stateful Functions 3.4

2024-06-17 Thread L. Jiang
Hi there,
Anyone knows which Flink version that Flink Stateful Functions 3.4 is
compatible with?
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/deployment/state-bootstrap/

I know Stateful Functions 3.3 is compatible with Flink 1.16.2, and Stateful
Functions 3.2 is good with Flink 1.14.3 and earlier. But couldn't find
information about Flink Stateful Functions 3.4. Hope it works with Flink
1.8 at minimum.

Thanks,

-- 
Best,
Liangjun


-- 
Best,
Liangjun


RE: aws zeppelin glue table connector

2023-03-09 Thread Katz, David L via user
Hi-

Don't see a response for this so taking one last stab.  Any reply would be 
greatly appreciated.

Thanks,
-Dave

David L Katz (he/him/his) 
why?<https://sites.jpmchase.com/sites/companyhome/about%20us/diversity/pages/language,-inclusion-%26-gender-pronous.aspx>
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310

From: Katz, David L (CIB Tech, USA)
Sent: Friday, March 03, 2023 11:25 AM
To: user 
Subject: aws zeppelin glue table connector

Hi-

Hoping someone can give me more understanding and a sense of timing flink 
leveraging glue tables.  Specifically I would like to leverage a glue table 
data source via an AWS hosted Zeppelin notebook (instead of referencing a 
kinesis data stream or s3 bucket directly).  Additionally I will need to have 
this work for a lake formation glue database (i.e. underlying data is in a 
different aws account).  Believe this JIRA may cover it but not sure all of 
that is in scope:
https://issues.apache.org/jira/browse/FLINK-29549

Questions that come to mind:

  1.  Is the full data flow I outlined above expected to be supported with that 
JIRA?
  2.  Do we know when the connector will be released?
  3.  Do we have a general sense of when AWS would make that version of flink 
will be available as a flink runtime?

Thanks in advance!
-Dave


David L Katz (he/him/his) 
why?<https://sites.jpmchase.com/sites/companyhome/about%20us/diversity/pages/language,-inclusion-%26-gender-pronous.aspx>
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


aws zeppelin glue table connector

2023-03-03 Thread Katz, David L via user
Hi-

Hoping someone can give me more understanding and a sense of timing flink 
leveraging glue tables.  Specifically I would like to leverage a glue table 
data source via an AWS hosted Zeppelin notebook (instead of referencing a 
kinesis data stream or s3 bucket directly).  Additionally I will need to have 
this work for a lake formation glue database (i.e. underlying data is in a 
different aws account).  Believe this JIRA may cover it but not sure all of 
that is in scope:
https://issues.apache.org/jira/browse/FLINK-29549

Questions that come to mind:

  1.  Is the full data flow I outlined above expected to be supported with that 
JIRA?
  2.  Do we know when the connector will be released?
  3.  Do we have a general sense of when AWS would make that version of flink 
will be available as a flink runtime?

Thanks in advance!
-Dave


David L Katz (he/him/his) 
why?<https://sites.jpmchase.com/sites/companyhome/about%20us/diversity/pages/language,-inclusion-%26-gender-pronous.aspx>
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


RE: [EXTERNAL]Re: aws glue connector

2022-12-20 Thread Katz, David L via user
Hi Danny-

Thanks for your response!

At first glance, I’m not sure, will drill in to look deeper.  For 
clarification, a few points I probably should have expressed initially:

  1.  I’m running flink as a Kinesis Data Application (AWS serverless 
application)
  2.  The reason I want to use  streaming (and also static) glue tables instead 
of the underlying kinesis data stream (or s3 buckets) as a source are that I 
will be leveraging multiple AWS accounts and this is our security strategy (as 
opposed to using cross account access).
  3.  So in other words, I might have a static glue table backed by an s3 
bucket in one account and a streaming glue table backed by a kinesis data 
stream in another account and want to join the two as part of a streaming 
application by just going to the central lake database as a consumer.

Thanks again!
-Dave

David L Katz (he/him/his) 
why?<https://sites.jpmchase.com/sites/companyhome/about%20us/diversity/pages/language,-inclusion-%26-gender-pronous.aspx>
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310

From: Danny Cranmer 
Sent: Tuesday, December 20, 2022 3:12 PM
To: Katz, David L (CIB Tech, USA) 
Cc: user 
Subject: [EXTERNAL]Re: aws glue connector

Hello David,

There is a FLIP [1] to add native Glue Catalog support and we already have Glue 
Schema Registry format plugins [2][3], however these are data stream API only. 
Are you intending on just using the Glue schema features or leveraging other 
features? Would either of the things I mentioned help with your usecase?

Thanks,
Danny

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-277%3A+Native+GlueCatalog+Support+in+Flink<https://secureweb.jpmchase.net/readonly/https:/cwiki.apache.org/confluence/display/FLINK/FLIP-277%3A+Native+GlueCatalog+Support+in+Flink>
[2] 
https://github.com/apache/flink/tree/master/flink-formats/flink-avro-glue-schema-registry<https://secureweb.jpmchase.net/readonly/https:/github.com/apache/flink/tree/master/flink-formats/flink-avro-glue-schema-registry>
[3] 
https://github.com/apache/flink/tree/master/flink-formats/flink-json-glue-schema-registry<https://secureweb.jpmchase.net/readonly/https:/github.com/apache/flink/tree/master/flink-formats/flink-json-glue-schema-registry>

On Mon, Dec 19, 2022 at 2:31 PM Katz, David L via user 
mailto:user@flink.apache.org>> wrote:
Hi-

Has anyone used a glue table connector (specifically trying to get a streaming 
glue table that sits on top of a kinesis data stream)?  I have been using the 
kinesis stream connector but want to integrate with lake formation so would 
like to take advantage of the glue table layer.

Thanks,
-Dave

David L Katz (he/him/his) 
why?<https://sites.jpmchase.com/sites/companyhome/about%20us/diversity/pages/language,-inclusion-%26-gender-pronous.aspx>
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.

This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


aws glue connector

2022-12-19 Thread Katz, David L via user
Hi-

Has anyone used a glue table connector (specifically trying to get a streaming 
glue table that sits on top of a kinesis data stream)?  I have been using the 
kinesis stream connector but want to integrate with lake formation so would 
like to take advantage of the glue table layer.

Thanks,
-Dave

David L Katz (he/him/his) 
why?<https://sites.jpmchase.com/sites/companyhome/about%20us/diversity/pages/language,-inclusion-%26-gender-pronous.aspx>
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


RE: [EXTERNAL]multiple s3 sinks

2022-10-19 Thread Katz, David L via user
Hi-

Never mind.  Error must have been from shutting down the prior version of the 
app (timestamp was after restart), issue is now resolved.

Thanks,
-Dave

David L Katz (he/him/his) 
why?<https://sites.jpmchase.com/sites/companyhome/about%20us/diversity/pages/language,-inclusion-%26-gender-pronous.aspx>
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310

From: Katz, David L via user 
Sent: Wednesday, October 19, 2022 11:46 AM
To: user 
Subject: [EXTERNAL]multiple s3 sinks

Hi-

I'm getting the following error when trying to write to 3 s3 sinks (in an aws 
kinesis java app):

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than 
one execute() or executeAsync() call in a single environment.

Code snippet (s3Sum,s3Agg and s3Det are the table creation sql strings for the 
3 sink_tables):

tableEnv.executeSql(s3Sum);
tableEnv.executeSql(s3Agg);
tableEnv.executeSql(s3Det);
StatementSet stmtSet = 
tableEnv.createStatementSet();
stmtSet.addInsert("sink_table_sum", resultsSum);
stmtSet.addInsert("sink_table_agg", resultsAgg);
stmtSet.addInsert("sink_table_det", resultsDet);
stmtSet.execute();

I read that switching to a StatementSet is the way to go, but I'm still missing 
something.  Any help would be greatly appreciated!

Thanks,
-Dave


David L Katz (he/him/his) 
why?<https://sites.jpmchase.com/sites/companyhome/about%20us/diversity/pages/language,-inclusion-%26-gender-pronous.aspx>
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.

This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


multiple s3 sinks

2022-10-19 Thread Katz, David L via user
Hi-

I'm getting the following error when trying to write to 3 s3 sinks (in an aws 
kinesis java app):

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than 
one execute() or executeAsync() call in a single environment.

Code snippet (s3Sum,s3Agg and s3Det are the table creation sql strings for the 
3 sink_tables):

tableEnv.executeSql(s3Sum);
tableEnv.executeSql(s3Agg);
tableEnv.executeSql(s3Det);
StatementSet stmtSet = 
tableEnv.createStatementSet();
stmtSet.addInsert("sink_table_sum", resultsSum);
stmtSet.addInsert("sink_table_agg", resultsAgg);
stmtSet.addInsert("sink_table_det", resultsDet);
stmtSet.execute();

I read that switching to a StatementSet is the way to go, but I'm still missing 
something.  Any help would be greatly appreciated!

Thanks,
-Dave


David L Katz (he/him/his) 
why?<https://sites.jpmchase.com/sites/companyhome/about%20us/diversity/pages/language,-inclusion-%26-gender-pronous.aspx>
 | Executive Director | Macro Technology | 545 Washington Blvd, Floor 4 | 
Jersey City, NJ 07310


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


Re: Flink 1.12.8 release

2022-02-16 Thread Joey L
Hey Martin,

Thanks for the response. That's unfortunate, I assumed there would be a
1.12.8 release since there are many Flink issues in JIRA marked with `Fix
Versions: 1.12.8` and can see that there are many unreleased commits in the
release-1.12 branch.

Any chance that they would be released at all?

Regards,
J

On Wed, 16 Feb 2022 at 19:39, Martijn Visser  wrote:

> Hi Joey,
>
> Since the Flink community only supports the latest and previous minor
> release [1] (currently Flink 1.14 and 1.13), I'm not expecting another
> release of Flink 1.12.
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1] https://flink.apache.org/downloads.html#update-policy-for-old-releases
>
>
> On Wed, 16 Feb 2022 at 08:54, Joey L  wrote:
>
>> Hi,
>>
>> Is there a planned release date for 1.12.8 or scheduled release cycle for
>> minor versions?
>>
>> Regards,
>> J
>>
>


Flink 1.12.8 release

2022-02-15 Thread Joey L
Hi,

Is there a planned release date for 1.12.8 or scheduled release cycle for
minor versions?

Regards,
J


Re: Kubernetes HA: New jobs stuck in Initializing for a long time after a certain number of existing jobs are running

2021-11-22 Thread Joey L
Hi Matthias,

Thanks for the response. I actually found the root issue a while after
posting the question, and it is related to this JIRA ticket:
https://issues.apache.org/jira/browse/FLINK-22006

It appears to be a limit on the concurrent configmaps K8s can watch, and
adding this to my config worked.

```
containerized.master.env.KUBERNETES_MAX_CONCURRENT_REQUESTS: 300
env.java.opts.jobmanager: "-Dkubernetes.max.concurrent.requests=300"
```

Thanks,
Joey

On Tue, 23 Nov 2021 at 00:19, Matthias Pohl  wrote:

> Hi Joey,
> that looks like a cluster configuration issue. The 192.168.100.79:6123 is
> not accessible from the JobManager pod (see line 1224f in the provided JM
> logs):
>2021-11-19 04:06:45,049 WARN
>  akka.remote.transport.netty.NettyTransport   [] - Remote
> connection to [null] failed with java.net.NoRouteToHostException: No route
> to host
>2021-11-19 04:06:45,067 WARN  akka.remote.ReliableDeliverySupervisor
> [] - Association with remote system [akka.tcp://
> flink@192.168.100.79:6123] has failed, address is now gated for [50] ms.
> Reason: [Association failed with [akka.tcp://flink@192.168.100.79:6123]]
> Caused by: [java.net.NoRouteToHostException: No route to host]
>
> The TaskManagers are able to communicate with the JobManager pod and are
> properly registered. The JobMaster, instead, tries to connect to the
> ResourceManager (both running on the JobManager pod) but fails.
> SlotRequests are triggered but never actually fulfilled. They are put in
> the queue for pending SlotRequests. The timeout kicks in after trying to
> reach the ResourceManager for some time. That's
> the NoResourcesAvailableException you are experiencing.
>
> Matthias
>
> On Fri, Nov 19, 2021 at 7:02 AM Joey L  wrote:
>
>> Hi,
>>
>> I've set up a Flink 1.12.5 session cluster running on K8s with HA, and
>> came across an issue with creating new jobs once the cluster has reached 20
>> existing jobs. The first 20 jobs always gets initialized and start running
>> within 5 - 10 seconds.
>>
>> Any new job submission is stuck in Initializing state for a long time (10
>> - 30 mins), and eventually it goes to Running but the tasks are stuck in
>> Scheduled state despite there being free task slots available. The
>> Scheduled jobs will eventually start running, but the delay could be up to
>> an hour. Interestingly, this issue doesn't occur once I remove the HA
>> config.
>>
>> Each task manager is configured to have 4 task slots, and I can see via
>> the Flink UI that the task managers are registered correctly. (Refer to
>> attached screenshot).
>>
>> [image: Screen Shot 2021-11-19 at 3.08.11 pm.png]
>>
>> In the logs, I can see that jobs stuck in Scheduled throw this exception
>> after 5 minutes (eventhough there are slots available):
>>
>> ```
>> java.util.concurrent.CompletionException:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Slot request bulk is not fulfillable! Could not allocate the required slot
>> within slot request timeout
>> ```
>>
>> I've also attached the full job manager logs below.
>>
>> Any help/guidance would be appreciated.
>>
>> Thanks,
>> Joey
>>
>


Re: NoClassDefFoundError for RMIServerImpl when running Flink with Scala 2.12 and Java 11

2021-11-02 Thread L. C. Hsieh
Hi Nicolaus,

Thanks for your reply.  It turns out to be the Java distribution used
in the base image.
I changed the base image and it works now.

On Tue, Nov 2, 2021 at 10:14 AM Nicolaus Weidner
 wrote:
>
> Hi,
>
> I tried building Flink 1.13 with the Scala 2.12 profile and running some of 
> the examples with Java 11, without encountering the issue you describe (with 
> or without HA). Can you give more details on how exactly you built Flink 
> (ideally the full command), and how you ran the job?
>
> Best,
> Nico
>
> On Thu, Oct 28, 2021 at 10:42 AM L. C. Hsieh  wrote:
>>
>> Hi Flink developers,
>>
>> I encountered a weird error like follows. It only happens when I build Flink 
>> with Scala 2.12 profile and run with Java 11. No such error for Scala 2.11 + 
>> Java 8/11.
>>
>> Just search for relative info, but don't find any useful. Do you have any 
>> idea about this? Thanks.
>>
>> java.lang.NoClassDefFoundError: javax/management/remote/rmi/RMIServerImpl
>> at 
>> org.apache.flink.runtime.management.JMXService.getPort(JMXService.java:76) 
>> ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>> at 
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.connectToResourceManager(TaskExecutor.java:1300)
>>  ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>> at 
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.tryConnectToResourceManager(TaskExecutor.java:1284)
>>  ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>> at 
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.reconnectToResourceManager(TaskExecutor.java:1279)
>>  ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>> at 
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1258)
>>  ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>> at 
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:181)
>>  ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
>> at 
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:2150)
>>  ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]


NoClassDefFoundError for RMIServerImpl when running Flink with Scala 2.12 and Java 11

2021-10-28 Thread L . C . Hsieh
Hi Flink developers,

I encountered a weird error like follows. It only happens when I build Flink 
with Scala 2.12 profile and run with Java 11. No such error for Scala 2.11 + 
Java 8/11.

Just search for relative info, but don't find any useful. Do you have any idea 
about this? Thanks.

java.lang.NoClassDefFoundError: javax/management/remote/rmi/RMIServerImpl
at 
org.apache.flink.runtime.management.JMXService.getPort(JMXService.java:76) 
~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.connectToResourceManager(TaskExecutor.java:1300)
 ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.tryConnectToResourceManager(TaskExecutor.java:1284)
 ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.reconnectToResourceManager(TaskExecutor.java:1279)
 ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1258)
 ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:181)
 ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:2150)
 ~[flink-dist_2.12-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]


Re: NoClassDefFoundError if Kafka classes are assemblied in application jar

2021-10-18 Thread L. C. Hsieh
Hi Arvid,

Alexander Preuß has already replied to me and I also found a discussion on
https://stackoverflow.com/questions/51479657/flinkkafkaconsumer011-not-found-on-flink-cluster.

So by following
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/project-configuration/#adding-connector-and-library-dependencies,
that is to remove "provided" dependency scope and let Maven include it
in fat jar, it can work.

So, if we do not put the connector jar under Flink /lib, but include
it in the fat jar of the application. Everything is fine.

I have replied to Alexander a further question as the follows:

But I am also curious about how to explain it? Actually the
information you pointed out said it is recommended to do it, but not
said "cannot put into /lib".
As I mentioned at the beginning, Flink does "child-first" resolution
on classes, why Flink cannot find Kafka classes in the application jar
if the connector jar is under /lib?
It is pretty confusing from a user's point of view.

On Mon, Oct 18, 2021 at 2:01 AM Arvid Heise  wrote:
>
> This looks very odd. How do you create the fat jar? What's your Flink version?
>
> I don't think this is a general Flink issue or else no one would be able to 
> read from Kafka at all.
>
> On Fri, Oct 15, 2021 at 4:16 AM L. C. Hsieh  wrote:
>>
>> Hi, Flink developers,
>>
>> Does anyone encounter the following error?
>>
>> java.lang.NoClassDefFoundError: 
>> org/apache/kafka/common/serialization/ByteArrayDeserializer
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108)
>>
>> If I put kafka-clients jar into Flink's lib/, Flink can find it. But if I 
>> assembly it into the application jar, Flink cannot find it. But based on 
>> what I read from Flink doc, Flink does "child-first" resolution on classes. 
>> Why it cannot find kafka classes if they are in application jar??
>>
>> I examined the application jar content. It includes these kafka classes 
>> actually.
>>
>> I tested it with K8S session and job clusters on Flink built from current 
>> source. Both have the same error.


NoClassDefFoundError if Kafka classes are assemblied in application jar

2021-10-14 Thread L . C . Hsieh
Hi, Flink developers,

Does anyone encounter the following error?

java.lang.NoClassDefFoundError: 
org/apache/kafka/common/serialization/ByteArrayDeserializer
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108)

If I put kafka-clients jar into Flink's lib/, Flink can find it. But if I 
assembly it into the application jar, Flink cannot find it. But based on what I 
read from Flink doc, Flink does "child-first" resolution on classes. Why it 
cannot find kafka classes if they are in application jar??

I examined the application jar content. It includes these kafka classes 
actually.

I tested it with K8S session and job clusters on Flink built from current 
source. Both have the same error. 


Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-22 Thread L . C . Hsieh


I also noticed that Flink s3-hadoop plugin has Hadoop common dependency. I' 
trying this.

>From the logs, the plugin is enabled:

Enabling required built-in plugins
Linking flink-s3-fs-hadoop-1.12-SNAPSHOT.jar to plugin directory
Successfully enabled flink-s3-fs-hadoop-1.12-SNAPSHOT.jar

But I also got:

java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configurable at 
java.net.URLClassLoader.findClass(URLClassLoader.java:382) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:418) at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)

So it looks like user code cannot use plugin's classes (e.g. Hadoop classes)?

I don't see hadoop-common is shaded at the plugin.


On 2021/08/22 18:24:24, L. C. Hsieh  wrote: 
> 
> As I know, flink-shaded-hadoop is not officially supported since Flink 1.11 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html).
> 
> Anyway, I installed Hadoop common package into the docker images to make 
> Flink happy. I marked the hadoop dependencies in the iceberg-test application 
> as "provided". Looks okay as it won't throw the previous LinkageError anymore.
> 
> But a new issue is, the application submitted cannot be run because the task 
> manager pod is failed to run. There is warning message by describing pod 
> status:
> 
>   Warning  FailedMount  4m4s  kubeletUnable to 
> attach or mount volumes: unmounted volumes=[hadoop-config-volume], unattached 
> volumes=[flink-config-volume kube-api-access-772x5 hadoop-config-volume]: 
> timed out waiting for the condition
>   Warning  FailedMount  108s (x2 over 6m22s)  kubeletUnable to 
> attach or mount volumes: unmounted volumes=[hadoop-config-volume], unattached 
> volumes=[hadoop-config-volume flink-config-volume kube-api-access-772x5]: 
> timed out waiting for the condition
>   Warning  FailedMount  11s (x12 over 8m25s)  kubelet
> MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap 
> "hadoop-config-my-first-flink-cluster" not found
> 
> Seems it cannot mount "hadoop-config-volume".
> 
> From the doc 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/),
>  it looks like that Flink will add some internal ConfigMap volumes 
> automatically. So again, I am not sure what is wrong in above steps...
> 
> 
> On 2021/08/22 10:01:25, Manong Karl  wrote: 
> > I prefer using flink bundled hadoop, such as
> > https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar.
> > May help.
> > 
> > L. C. Hsieh  于2021年8月22日周日 上午1:40写道:
> > 
> > >
> > > BTW, I checked dependency tree, the flink-iceberg demo only has one Hadoop
> > > common dependency. So I'm not sure why Flink throws such exception. Based
> > > on Flink doc, I suppose that Flink binary doesn't include Hadoop
> > > dependencies, right?
> > >
> > > Based on the exception, looks like when FlinkCatalogFactory (from Iceberg)
> > > calls HadoopUtils.getHadoopConfiguration (from Flink), their classloaders
> > > are different and referring to different Hadoop Configuration Class 
> > > objects.
> > >
> > > I'm not familiar with Flink. So I'm wondering what step is wrong during
> > > the testing? It is a pretty simple test to verify Iceberg and Flink.
> > >
> > > On 2021/08/21 08:50:05, L. C. Hsieh  wrote:
> > > >
> > > > Thanks for replying.
> > > >
> > > > I'm using Flink 1.12.x. And I think Iceberg 0.12 uses Flink 1.12
> > > actually.
> > > >
> > > > Once I upgraded the Iceberg from 0.11.0 to 0.12.0 for the Java
> > > application. I got new exception as below:
> > > >
> > > > java.lang.LinkageError: loader constraint violation: when resolving
> > > method
> > > "org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/hadoop/conf/Configuration;"
> > > the class loader (instance of org/apache/flink/util/ChildFirstClassLoader)
> > > of the current class, org/apache/iceberg/flink/FlinkCatalogFactory, and 
> > > the
> > > class loader (instance of sun/misc/Launcher$AppClassLoader) for the
> > > method's defining class, org/apache/flink/runtime/util/HadoopUtils, have
> > > different Class objects for the type org/apache/hadoop/conf/Configuration
> > > used in the signature at
> > > org.apach

Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-22 Thread L . C . Hsieh


As I know, flink-shaded-hadoop is not officially supported since Flink 1.11 
(https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html).

Anyway, I installed Hadoop common package into the docker images to make Flink 
happy. I marked the hadoop dependencies in the iceberg-test application as 
"provided". Looks okay as it won't throw the previous LinkageError anymore.

But a new issue is, the application submitted cannot be run because the task 
manager pod is failed to run. There is warning message by describing pod status:

  Warning  FailedMount  4m4s  kubeletUnable to 
attach or mount volumes: unmounted volumes=[hadoop-config-volume], unattached 
volumes=[flink-config-volume kube-api-access-772x5 hadoop-config-volume]: timed 
out waiting for the condition
  Warning  FailedMount  108s (x2 over 6m22s)  kubeletUnable to 
attach or mount volumes: unmounted volumes=[hadoop-config-volume], unattached 
volumes=[hadoop-config-volume flink-config-volume kube-api-access-772x5]: timed 
out waiting for the condition
  Warning  FailedMount  11s (x12 over 8m25s)  kubelet
MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap 
"hadoop-config-my-first-flink-cluster" not found

Seems it cannot mount "hadoop-config-volume".

>From the doc 
>(https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/),
> it looks like that Flink will add some internal ConfigMap volumes 
>automatically. So again, I am not sure what is wrong in above steps...


On 2021/08/22 10:01:25, Manong Karl  wrote: 
> I prefer using flink bundled hadoop, such as
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar.
> May help.
> 
> L. C. Hsieh  于2021年8月22日周日 上午1:40写道:
> 
> >
> > BTW, I checked dependency tree, the flink-iceberg demo only has one Hadoop
> > common dependency. So I'm not sure why Flink throws such exception. Based
> > on Flink doc, I suppose that Flink binary doesn't include Hadoop
> > dependencies, right?
> >
> > Based on the exception, looks like when FlinkCatalogFactory (from Iceberg)
> > calls HadoopUtils.getHadoopConfiguration (from Flink), their classloaders
> > are different and referring to different Hadoop Configuration Class objects.
> >
> > I'm not familiar with Flink. So I'm wondering what step is wrong during
> > the testing? It is a pretty simple test to verify Iceberg and Flink.
> >
> > On 2021/08/21 08:50:05, L. C. Hsieh  wrote:
> > >
> > > Thanks for replying.
> > >
> > > I'm using Flink 1.12.x. And I think Iceberg 0.12 uses Flink 1.12
> > actually.
> > >
> > > Once I upgraded the Iceberg from 0.11.0 to 0.12.0 for the Java
> > application. I got new exception as below:
> > >
> > > java.lang.LinkageError: loader constraint violation: when resolving
> > method
> > "org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/hadoop/conf/Configuration;"
> > the class loader (instance of org/apache/flink/util/ChildFirstClassLoader)
> > of the current class, org/apache/iceberg/flink/FlinkCatalogFactory, and the
> > class loader (instance of sun/misc/Launcher$AppClassLoader) for the
> > method's defining class, org/apache/flink/runtime/util/HadoopUtils, have
> > different Class objects for the type org/apache/hadoop/conf/Configuration
> > used in the signature at
> > org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:152)
> > >
> > >
> > > On 2021/08/21 08:11:33, Manong Karl  wrote:
> > > > Iceberg v0.11 or v0.12 not capable with flink v1.13.x.
> > > >
> > > > L. C. Hsieh  于2021年8月21日周六 下午3:52写道:
> > > >
> > > > > Hi, I'm testing using Flink to write Iceberg table. I run Flink
> > native K8S
> > > > > cluster locally and submit a simple Java program that writes out
> > Iceberg
> > > > > table (https://github.com/spancer/flink-iceberg-demo). But got an
> > > > > exception:
> > > > >
> > > > > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
> > at
> > > > >
> > org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
> > > > > at
> > > > >
> > org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49)
> > > > > at
> > > > >
> > com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)
> > > > >
> > > > > The uploaded is a fat jar. I also checked the uploaded application
> > jar. It
> > > > > has the Configuration class. So I don't know what is wrong there.
> > Any idea
> > > > > or suggestion? Thanks.
> > > > >
> > > >
> > >
> >
> 


Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-21 Thread L . C . Hsieh


BTW, I checked dependency tree, the flink-iceberg demo only has one Hadoop 
common dependency. So I'm not sure why Flink throws such exception. Based on 
Flink doc, I suppose that Flink binary doesn't include Hadoop dependencies, 
right?

Based on the exception, looks like when FlinkCatalogFactory (from Iceberg) 
calls HadoopUtils.getHadoopConfiguration (from Flink), their classloaders are 
different and referring to different Hadoop Configuration Class objects.

I'm not familiar with Flink. So I'm wondering what step is wrong during the 
testing? It is a pretty simple test to verify Iceberg and Flink.

On 2021/08/21 08:50:05, L. C. Hsieh  wrote: 
> 
> Thanks for replying.
> 
> I'm using Flink 1.12.x. And I think Iceberg 0.12 uses Flink 1.12 actually.
> 
> Once I upgraded the Iceberg from 0.11.0 to 0.12.0 for the Java application. I 
> got new exception as below:
> 
> java.lang.LinkageError: loader constraint violation: when resolving method 
> "org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/hadoop/conf/Configuration;"
>  the class loader (instance of org/apache/flink/util/ChildFirstClassLoader) 
> of the current class, org/apache/iceberg/flink/FlinkCatalogFactory, and the 
> class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's 
> defining class, org/apache/flink/runtime/util/HadoopUtils, have different 
> Class objects for the type org/apache/hadoop/conf/Configuration used in the 
> signature at 
> org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:152)
> 
> 
> On 2021/08/21 08:11:33, Manong Karl  wrote: 
> > Iceberg v0.11 or v0.12 not capable with flink v1.13.x.
> > 
> > L. C. Hsieh  于2021年8月21日周六 下午3:52写道:
> > 
> > > Hi, I'm testing using Flink to write Iceberg table. I run Flink native K8S
> > > cluster locally and submit a simple Java program that writes out Iceberg
> > > table (https://github.com/spancer/flink-iceberg-demo). But got an
> > > exception:
> > >
> > > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at
> > > org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
> > > at
> > > org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49)
> > > at
> > > com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)
> > >
> > > The uploaded is a fat jar. I also checked the uploaded application jar. It
> > > has the Configuration class. So I don't know what is wrong there. Any idea
> > > or suggestion? Thanks.
> > >
> > 
> 


Re: Got NoClassDefFoundError on Flink native K8S cluster

2021-08-21 Thread L . C . Hsieh


Thanks for replying.

I'm using Flink 1.12.x. And I think Iceberg 0.12 uses Flink 1.12 actually.

Once I upgraded the Iceberg from 0.11.0 to 0.12.0 for the Java application. I 
got new exception as below:

java.lang.LinkageError: loader constraint violation: when resolving method 
"org.apache.flink.runtime.util.HadoopUtils.getHadoopConfiguration(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/hadoop/conf/Configuration;"
 the class loader (instance of org/apache/flink/util/ChildFirstClassLoader) of 
the current class, org/apache/iceberg/flink/FlinkCatalogFactory, and the class 
loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining 
class, org/apache/flink/runtime/util/HadoopUtils, have different Class objects 
for the type org/apache/hadoop/conf/Configuration used in the signature at 
org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:152)


On 2021/08/21 08:11:33, Manong Karl  wrote: 
> Iceberg v0.11 or v0.12 not capable with flink v1.13.x.
> 
> L. C. Hsieh  于2021年8月21日周六 下午3:52写道:
> 
> > Hi, I'm testing using Flink to write Iceberg table. I run Flink native K8S
> > cluster locally and submit a simple Java program that writes out Iceberg
> > table (https://github.com/spancer/flink-iceberg-demo). But got an
> > exception:
> >
> > java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at
> > org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
> > at
> > org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49)
> > at
> > com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)
> >
> > The uploaded is a fat jar. I also checked the uploaded application jar. It
> > has the Configuration class. So I don't know what is wrong there. Any idea
> > or suggestion? Thanks.
> >
> 


Got NoClassDefFoundError on Flink native K8S cluster

2021-08-21 Thread L . C . Hsieh
Hi, I'm testing using Flink to write Iceberg table. I run Flink native K8S 
cluster locally and submit a simple Java program that writes out Iceberg table 
(https://github.com/spancer/flink-iceberg-demo). But got an exception:

java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at 
org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:148)
 at org.apache.iceberg.flink.TableLoader.fromHadoopTable(TableLoader.java:49) 
at 
com.coomia.iceberg.test.IcebergReadWriteTest.main(IcebergReadWriteTest.java:89)

The uploaded is a fat jar. I also checked the uploaded application jar. It has 
the Configuration class. So I don't know what is wrong there. Any idea or 
suggestion? Thanks.


固定间隔重启策略 - 计数逻辑

2021-08-18 Thread much l
Hi 大家好:
我想问一下当重启策略为:restart-strategy: fixed-delay 时,其参数
restart-strategy.fixed-delay.attempts
是全局计数(任务生命周期)?还是每次当任务从HA失败策略中恢复后,会重置重试次数,下次失败重新从0开始?


????flink????state

2021-02-06 Thread ???????L
import org.apache.flink.api.java.functions.KeySelector; import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; 
import org.apache.flink.streaming.api.windowing.time.Time;   ... 
DataStream

??????flinkSQL??ValueStateDescriptor????????StateTtlConfig

2021-02-03 Thread ???????L
streamTableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1));




----
??: "stgztsw"http://apache-flink.147419.n8.nabble.com/

flink sql

2021-02-01 Thread ???????L
hi, ?? ??1.12flink sql  
??datastream?,

??????????: ????: ??????????

2021-02-01 Thread ???????L
,??1.12?? EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment streamTableEnv = 
StreamTableEnvironment.create(executionEnvironment, settings);
Configuration configuration = streamTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.idle-timeout","1000 ms");




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

best,
amenhub



nbsp;
 ???L
?? 2021-02-01 17:20
 user-zh
?? ??
flink1.12, kafka??3??, flink??3??. ??, 
, ??,, 
?

??????????: ??????????

2021-02-01 Thread ???????L
, ?? Configuration configuration = 
streamTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.source.idle-timeout","1000 ms"); 





----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

best,
amenhub




 ???L
?? 2021-02-01 17:20
 user-zh
?? ??
flink1.12, kafka??3??, flink??3??. ??, 
, ??,, 
?

??????????

2021-02-01 Thread ???????L
flink1.12, kafka??3??, flink??3??. ??, 
, ??,, 
?

?????? flink sql????????

2021-01-31 Thread ???????L
 streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8")) 
. ,??,




----
??: 
   "user-zh"



flink sql????????

2021-01-31 Thread ???????L
streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))

flink sql ????????????

2021-01-31 Thread ???????L
flink sql+8,??

?????? ????????????

2021-01-29 Thread ???????L





kafka ??3??, ??, flink??3


----
??: 
   "user-zh"

http://apache-flink.147419.n8.nabble.com/

????????????

2021-01-29 Thread ???????L
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
StreamExecutionEnvironment executionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(3);
Map

FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread jy l
Hi:
Flink SQL filter data throw an exception,
code:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromElements(
  (1.0f, 11.0f, 12.0f),
  (2.0f, 21.0f, 22.0f),
  (3.0f, 31.0f, 32.0f),
  (4.0f, 41.0f, 42.0f),
  (5.0f, 51.0f, 52.0f)
)
val settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
val query =
  """
|select * from myTable where id in (1.0, 2.0, 3.0)
|""".stripMargin
tEnv.executeSql(query).print()
}

exception:
Exception in thread "main" java.lang.UnsupportedOperationException: class
org.apache.calcite.sql.type.SqlTypeName: FLOAT
at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
at org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
at org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
at
org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)

Why is that? How do i need to solve it?

thanks.


FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread jy l
Hi:
Flink SQL filter data throw an exception,
code:
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromElements(
  (1.0f, 11.0f, 12.0f),
  (2.0f, 21.0f, 22.0f),
  (3.0f, 31.0f, 32.0f),
  (4.0f, 41.0f, 42.0f),
  (5.0f, 51.0f, 52.0f)
)
val settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
tEnv.createTemporaryView("myTable", source, $("id"), $("f1"), $("f2"))
val query =
  """
|select * from myTable where id in (1.0, 2.0, 3.0)
|""".stripMargin
tEnv.executeSql(query).print()
}

exception:
Exception in thread "main" java.lang.UnsupportedOperationException: class
org.apache.calcite.sql.type.SqlTypeName: FLOAT
at org.apache.calcite.util.Util.needToImplement(Util.java:1075)
at org.apache.calcite.rex.RexLiteral.appendAsJava(RexLiteral.java:703)
at org.apache.calcite.rex.RexLiteral.toJavaString(RexLiteral.java:408)
at org.apache.calcite.rex.RexLiteral.computeDigest(RexLiteral.java:276)
at org.apache.calcite.rex.RexLiteral.(RexLiteral.java:223)
at org.apache.calcite.rex.RexLiteral.toLiteral(RexLiteral.java:737)
at org.apache.calcite.rex.RexLiteral.lambda$printSarg$4(RexLiteral.java:710)
at org.apache.calcite.util.RangeSets$Printer.singleton(RangeSets.java:397)
at org.apache.calcite.util.RangeSets.forEach(RangeSets.java:237)
at org.apache.calcite.util.Sarg.lambda$printTo$0(Sarg.java:110)
at org.apache.calcite.linq4j.Ord.forEach(Ord.java:157)
at org.apache.calcite.util.Sarg.printTo(Sarg.java:106)
at org.apache.calcite.rex.RexLiteral.printSarg(RexLiteral.java:709)
at
org.apache.calcite.rex.RexLiteral.lambda$appendAsJava$1(RexLiteral.java:652)
at org.apache.calcite.util.Util.asStringBuilder(Util.java:2502)

Why is that? How do i need to solve it?

thanks.


DynamicTableSource中Filter push down

2020-12-24 Thread jy l
Hi:
各位大佬,请教一个问题。
我再flink-1.12.0上自定义一个DynamicTableSource,并支持SupportsFilterPushDown,SupportsProjectionPushDown等特性。
然后我的ScanRuntimeProvider使用的是InputFormatProvider。
我运行时,下推的filters在创建InputFormat和copy()方法之后,那我还怎么在InputFormat中根据filters去过滤数据源呢?

我的理解是SupportsFilterPushDown和SupportsProjectionPushDown中的方法应该在DynamicTableSource的copy()方法之后被调用,这样我先拿到project和filter,再去创建InputFormat,这样我就可以根据filter在数据源的地方将不需要的那部分数据过滤掉,使到达flink时的数据只是我需要的那部分数据。可是目前这些方法的调用顺序好像是在InputFormat创建之前。

望知道的告知一下,感谢!
祝好!


TableEnvironment中怎么添加参数,使参数能在TaskManagerServices中能获取到?

2020-12-22 Thread jy l
Hi:
我的程序是Flink Table/SQL开发的,我想要设置参数能在TaskManagerServices中生效,怎么设置?

我使用如下设置,在TaskManagerServices中并不能获取到我设置的值。
val settings = EnvironmentSettings.newInstance()
.inBatchMode()
.build()
val tEnv = TableEnvironment.create(settings)
tEnv.getConfig.getConfiguration.set(CoreOptions.CHECK_LEAKED_CLASSLOADER,
  java.lang.Boolean.FALSE)

我能怎么设置使其能在TaskManagerServices中能生效?


-
Thanks!


java.lang.IllegalStateException: Trying to access closed classloader.

2020-12-21 Thread jy l
Hi:
我在Idea里面运行我的flink程序,报了如下异常:
Exception in thread "Thread-22" java.lang.IllegalStateException: Trying to
access closed classloader. Please check if you store classloaders directly
or indirectly in static fields. If the stacktrace suggests that the leak
occurs in a third party library and cannot be fixed immediately, you can
disable this check with the configuration
'classloader.check-leaked-classloader'.
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:161)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:179)
at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
at
org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)
at
org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
at
org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
at
org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

是什么原因导致这样的异常,出现这样的异常,我该怎么处理?

望知道的人告知一下,感谢。


flink1.10.0中的filter下推还能在flink1.12.0中使用吗?

2020-12-20 Thread jy l
Hi:
我们在flink-1.10.0中自定义了connect,并实现类了FilterableTableSource接口。目前flink-1.12.0发布了,我们想将之前解析Expression放到新版本中来使用。但是发现之前解析的方式在新版本中不能使用了。
是新版本不再支持flink-1.10.0中的Expression解析方式了吗?还是之前的依旧可以用,只是我的打开方式不对,那在1.12.0中怎么解析Expression生成Or、EqualTo、LessThan等下推的Filter?




祝好!


tEnv.executeSql(query).print() 这样不能成功消费kafka的数据

2020-12-12 Thread jy l
Hi:
我Flink消费kafka的数据,我创建一张表如下:

val kafkaSource =
  """
|create table kafka_order(
|order_id string,
|order_price decimal(10,2),
|order_time timestamp(3)
|)
|with(
|'connector' = 'kafka',
|'topic' = 'iceberg.order',
|'properties.bootstrap.servers' = 'hostname:9092',
|'format' = 'json',
|'properties.group.id' = 'data-lake',
|'scan.startup.mode' = 'earliest-offset',
|'json.ignore-parse-errors' = 'false'
|)
|""".stripMargin
tEnv.executeSql(kafkaSource)

我直接查询后print到控制台时,没法消费成功,如下:
 val query =
  """
|select * from kafka_order
|""".stripMargin
tEnv.executeSql(query).print()

但是我创建一个print的connect,然后insert into 表 select * from kafka_order这样是可以正常消费的,如下:
val print =
  """
|create table p_order(
|order_id string,
|order_price decimal(10,2),
|order_time timestamp(3)
|)
|with(
|'connector' = 'print'
|)
|""".stripMargin
tEnv.executeSql(print)
val query =
  """
|insert into p_order
|select * from kafka_order
|""".stripMargin
tEnv.executeSql(query)

这具体是为什么呢?望知道的大佬告知一下,感激不尽。

祝好!


创建的临时表,在新的catalog中找不到对应的临时表

2020-12-11 Thread jy l
Hi,
通过以下API创建了一张临时表:
val oStream: DataStream[Order] = env.addSource(new BugSource)
tEnv.createTemporaryView("t_order", oStream, 'order_id, 'order_price,
'order_time)

然后又创建了新的catalog以及数据库,过程如下:
val catalog =
  """
|create catalog hive_catalog with (
| 'type'='iceberg',
| 'catalog-type'='hive',
| 'uri'='thrift://hostname:9083',
| 'clients'='5',
| 'property-version'='1',
| 'warehouse'='hdfs://hostname:8020/hive'
|)
|""".stripMargin

tEnv.executeSql(catalog)
tEnv.useCatalog("hive_catalog")

val database =
  """
|CREATE DATABASE iceberg_db
|""".stripMargin

tEnv.executeSql(database)

tEnv.useDatabase("iceberg_db")

val ice_order =
  """
|create table ice_t_order(
|order_id string,
|order_price decimal(10,2),
|order_time timestamp(3)
|)
|""".stripMargin
tEnv.executeSql(ice_order)

val query =
  """
|insert into ice_t_order
|select * from t_order
|""".stripMargin
tEnv.executeSql(query)

这样,我在执行时,抛出了找不到表t_order的异常,信息如下:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object
't_order' not found
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
... 27 more

像这样的问题,我该怎么做才能将提前创建的临时表在新的catalog中使用?
望知道的大佬告知,感谢!

祝好!


ScanTableSource 为什么不支持SupportsFilterPushDown的问题

2020-12-07 Thread jy l
Hi:
由于业务需要,我想在flink1.11.2中自定义一个ScanTableSource, 并想实现SupportsFilterPushDown特性。
但是在摸索的过程中发现,以下代码确似乎禁止使用SupportsFilterPushDown特性。

val unsupportedAbilities = List(
 classOf[SupportsFilterPushDown],
 classOf[SupportsLimitPushDown],
 classOf[SupportsPartitionPushDown],
 classOf[SupportsComputedColumnPushDown],
 classOf[SupportsWatermarkPushDown])
unsupportedAbilities.foreach { ability =>
 if (ability.isAssignableFrom(tableSource.getClass)) {
 throw new UnsupportedOperationException("Currently, a DynamicTableSource
with " +
 s"${ability.getSimpleName} ability is not supported.")
 }
}

SupportsFilterPushDown的介绍,Enables to push down filters into a {@link
ScanTableSource}.

这样的话,是不是有点矛盾呢?

我怎么能在ScanTableSource上实现push down filters?
以及实现了SupportsFilterPushDown的source有哪些?

望知道的大佬告知,感谢。

祝好!


SQL Cli中找不到DeserializationSchemaFactory

2020-11-24 Thread jy l
Hi:
flink版本1.12.0:

我想在sql-client-defaults.yaml中配置一张表,配置如下:

tables:

  - name: t_users

type: source-table

connector:

property-version: 1

type: kafka

version: universal

topic: ods.userAnalysis.user_profile

startup-mode: latest-offset

properties:

bootstrap.servers: hostname:9092

group.id: flink-analysis

format:

type: debezium-avro-confluent

property-version: 1

debezium-avro-confluent.schema-registry.url: http://hostname:8081

#schema-registry.url: http://hostname:8081

schema:

- name: userId

  data-type: STRING

- name: province

  data-type: STRING

- name: city

  data-type: STRING

- name: age

  data-type: INT

- name: education

  data-type: STRING

- name: jobType

  data-type: STRING

- name: marriage

  data-type: STRING

- name: sex

  data-type: STRING

- name: interest

  data-type: STRING




我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected exception.
This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.

at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)

at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in

the classpath.


Reason: Required context properties mismatch.


The following properties are requested:

connector.properties.bootstrap.servers=henghe66:9092

connector.properties.group.id=flink-analysis

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=ods.userAnalysis.user_profile

connector.type=kafka

connector.version=universal

format.debezium-avro-confluent.schema-registry.url=
http://192.168.101.43:8081

format.property-version=1

format.type=debezium-avro-confluent

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=userId

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=province

schema.2.data-type=VARCHAR(2147483647)

schema.2.name=city

schema.3.data-type=INT

schema.3.name=age

schema.4.data-type=VARCHAR(2147483647)

schema.4.name=education

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=jobType

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=marriage

schema.7.data-type=VARCHAR(2147483647)

schema.7.name=sex

schema.8.data-type=VARCHAR(2147483647)

schema.8.name=interest


The following factories have been considered:

org.apache.flink.formats.avro.AvroRowFormatFactory

org.apache.flink.formats.csv.CsvRowFormatFactory

org.apache.flink.formats.json.JsonRowFormatFactory

at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)

at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)

at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)

at
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)

at
org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)

at
org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:185)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:138)

at

Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 Thread jy l
我使用的是release-1.12.0-rc1

Best

Jark Wu  于2020年11月24日周二 上午11:42写道:

> 看报错像是一个 bug。 请问使用的是哪个版本呢?
> 可以去 JIRA issue 提个 issue。
>
> Best,
> Jark
>
> On Tue, 24 Nov 2020 at 11:27, jy l  wrote:
>
> > Hi:
> > FlinkSQL我在使用时发生一件很诡异的事件。具体如下:
> >
> > 我的DDL:
> > create table if not exists t_order(
> > id int PRIMARY KEY comment '订单id',
> > timestamps bigint comment '订单创建时间',
> > orderInformationId string comment '订单信息ID',
> > userId string comment '用户ID',
> > categoryId int comment '商品类别',
> > productId int comment '商品ID',
> > price decimal(10,2) comment '单价',
> > productCount int comment '购买数量',
> > priceSum decimal(10,2) comment '订单总价',
> > shipAddress string comment '商家地址',
> > receiverAddress string comment '收货地址',
> > ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
> > WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> > )with(
> > 'connector' = 'kafka',
> > 'format' = 'debezium-avro-confluent',
> > 'debezium-avro-confluent.schema-registry.url' = 'http://手动打码ip:8081
> <http://xn--ip:8081-lm1l957ilcaw623a>
> > <http://xn--ip:8081-lm1l957ilcaw623a>
> > <http://xn--ip:8081-lm1l957ilcaw623a>',
> > 'topic' = 'ods.userAnalysis.order',
> > 'properties.bootstrap.servers' = '手动打码ip:9092',
> > 'properties.group.id' = 'flink-analysis',
> > 'scan.startup.mode' = 'latest-offset'
> > )
> >
> > 我在查询该表时,使用如下查询语句能够正常查询出来:
> >
> >- select * from t_order
> >- select receiverAddress from t_order
> >- select
> >id,
> >timestamps,
> >orderInformationId,
> >userId,
> >categoryId,
> >productId,
> >price,
> >productCount,
> >priceSum,
> >shipAddress
> >from t_order
> >
> > 但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
> > select
> > id,
> > timestamps,
> > orderInformationId,
> > userId,
> > categoryId,
> > productId,
> > price,
> > productCount,
> > priceSum,
> > shipAddress,
> > receiverAddress
> > from t_order,
> > 报错信息如下:
> > Exception in thread "main" org.apache.flink.table.api.TableException:
> This
> > calc has no useful projection and no filter. It should be removed by
> > CalcRemoveRule.
> > at
> >
> >
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
> > at
> >
> >
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
> > ...
> >
> > receiverAddress这个字段明明是在我的DDL中具体申明了的,且单独查询也能出来。
> > 这具体是什么原因呢?望各位大佬告知。
> >
> >
> > 祝好!
> >
>


Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 Thread jy l
Hi:
FlinkSQL我在使用时发生一件很诡异的事件。具体如下:

我的DDL:
create table if not exists t_order(
id int PRIMARY KEY comment '订单id',
timestamps bigint comment '订单创建时间',
orderInformationId string comment '订单信息ID',
userId string comment '用户ID',
categoryId int comment '商品类别',
productId int comment '商品ID',
price decimal(10,2) comment '单价',
productCount int comment '购买数量',
priceSum decimal(10,2) comment '订单总价',
shipAddress string comment '商家地址',
receiverAddress string comment '收货地址',
ts AS TO_TIMESTAMP(FROM_UNIXTIME(timestamps/1000)),
WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
)with(
'connector' = 'kafka',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.schema-registry.url' = 'http://手动打码ip:8081
',
'topic' = 'ods.userAnalysis.order',
'properties.bootstrap.servers' = '手动打码ip:9092',
'properties.group.id' = 'flink-analysis',
'scan.startup.mode' = 'latest-offset'
)

我在查询该表时,使用如下查询语句能够正常查询出来:

   - select * from t_order
   - select receiverAddress from t_order
   - select
   id,
   timestamps,
   orderInformationId,
   userId,
   categoryId,
   productId,
   price,
   productCount,
   priceSum,
   shipAddress
   from t_order

但是我在第三条语句中加上receiverAddress字段时,就查询不出来了,sql如下:
select
id,
timestamps,
orderInformationId,
userId,
categoryId,
productId,
price,
productCount,
priceSum,
shipAddress,
receiverAddress
from t_order,
报错信息如下:
Exception in thread "main" org.apache.flink.table.api.TableException: This
calc has no useful projection and no filter. It should be removed by
CalcRemoveRule.
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:166)
at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:82)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
...

receiverAddress这个字段明明是在我的DDL中具体申明了的,且单独查询也能出来。
这具体是什么原因呢?望各位大佬告知。


祝好!


Flink SQL 对延迟数据怎么处理?

2020-11-23 Thread jy l
Hi:
请教一下,FlinkSQL中,我在创建表时设置了watermark并设置了最大延迟,可是还是有数据依旧会迟到晚到,对于这样的数据我们又不想直接丢弃,那这个依旧迟到的数据我该怎么收集?是否有与StreamAPI一样可以将依旧迟到的数据进行分流的方案?

祝好!


Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 Thread jy l
好的,我试一下。谢谢

Best

Jark Wu  于2020年11月23日周一 下午2:06写道:

> 那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求?
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 13:16, jy l  wrote:
>
> > 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
> > 目前我们的系统大致架构是mysql(debezium)>kafka--->flink>es
> >
> > Jark Wu  于2020年11月23日周一 上午10:35写道:
> >
> > > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> > > 你可以使用非 window 聚合来代替。
> > >
> > > Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
> > >
> > > Best,
> > > Jark
> > >
> > > On Mon, 23 Nov 2020 at 10:28, jy l  wrote:
> > >
> > > > Hi:
> > > > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> > > > [image: image.png]
> > > > [image: image.png]
> > > > 分组计算的SQL如下:
> > > > [image: image.png]
> > > > 在执行计算时,报了如下异常:
> > > > Exception in thread "main" org.apache.flink.table.api.TableException:
> > > > GroupWindowAggregate doesn't support consuming update and delete
> > changes
> > > > which is produced by node TableSourceScan(table=[[default_catalog,
> > > > default_database, t_order,
> > watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> > > > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> > > > orderInformationId, userId, categoryId, productId, price,
> productCount,
> > > > priceSum, shipAddress, receiverAddress])
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > > at
> > > >
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > > > at scala.collection.immutable.Range.foreach(Range.scala:155)
> > > > at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > > at
> > > >
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > > >
> > > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> > > > 那面对我这样的情况,该用什么方案来解决?
> > > > 望知道的各位告知一下,感谢!
> > > >
> > > > 祝好
> > > >
> > > >
> > >
> >
>


Re: FlinkSQL CDC 窗口分组聚合求助

2020-11-22 Thread jy l
使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
目前我们的系统大致架构是mysql(debezium)>kafka--->flink>es

Jark Wu  于2020年11月23日周一 上午10:35写道:

> Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> 你可以使用非 window 聚合来代替。
>
> Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 10:28, jy l  wrote:
>
> > Hi:
> > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> > [image: image.png]
> > [image: image.png]
> > 分组计算的SQL如下:
> > [image: image.png]
> > 在执行计算时,报了如下异常:
> > Exception in thread "main" org.apache.flink.table.api.TableException:
> > GroupWindowAggregate doesn't support consuming update and delete changes
> > which is produced by node TableSourceScan(table=[[default_catalog,
> > default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> > orderInformationId, userId, categoryId, productId, price, productCount,
> > priceSum, shipAddress, receiverAddress])
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > at
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > at scala.collection.immutable.Range.foreach(Range.scala:155)
> > at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > at
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> >
> > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> > 那面对我这样的情况,该用什么方案来解决?
> > 望知道的各位告知一下,感谢!
> >
> > 祝好
> >
> >
>


FlinkSQL CDC 窗口分组聚合求助

2020-11-22 Thread jy l
Hi:
我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
[image: image.png]
[image: image.png]
分组计算的SQL如下:
[image: image.png]
在执行计算时,报了如下异常:
Exception in thread "main" org.apache.flink.table.api.TableException:
GroupWindowAggregate doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[default_catalog,
default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
orderInformationId, userId, categoryId, productId, price, productCount,
priceSum, shipAddress, receiverAddress])
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
那面对我这样的情况,该用什么方案来解决?
望知道的各位告知一下,感谢!

祝好


关于global window

2020-11-19 Thread j l
您好,我在看global window的时候有一些疑问,首先是global
window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global
window,然后设置了process的并行度,但是window确实是只有一个
示例如下:

dataUnion.keyBy(0).window(new  StreamToBatchWindow()).process(new
StreamToBatchProcess()).setParallelism(20).print();

如果是这样的话,这个window岂不是成了瓶颈?不知道我理解的对不对,我是希望能多一些窗口对不同的key stream做global
window的处理。
另外一个就是global window会为每个key维护一个状态,这样如果key不断增加岂不是要爆了?怎样才能清除永远不会再出现的key的状态呢?

谢谢!


Re: Flink Batch 模式下,The rpc invocation size 113602196 exceeds the maximum akka framesize

2020-09-28 Thread jy l
如果使用了print()等算子,会将上一个task的结果一次全部pull过来,pull时数据超过了akka framesize大小导致。

李加燕  于2020年9月28日周一 下午3:07写道:

> Flink batch 模式消费hdfs上的文件,并做了一个word count
> 操作,但是task一直运行,查看taskmanager的log,发现如下异常:
> java.lang.reflect.UndeclaredThrowableException: null
> at com.sun.proxy.$Proxy35.updateTaskExecutionState(UnknownSource)
> ~[?:?]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.updateTaskExecutionState(TaskExecutor.java:1558)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1588)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2400(TaskExecutor.java:173)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:1921)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.11.1.jar:1.11.1]
> Causedby: java.io.IOException: The rpc invocation size 113602196 exceeds
> the maximum akka framesize.
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:276)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:205)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:134)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaInvocationHandler.invoke(FencedAkkaInvocationHandler.java:79)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> ... 28 more
> 我有尝试过在flink-conf.yaml配置akka framesize大小为30M,但是还是不能解决上述问题。
> 请求帮助。


通过Flink web ui上传jar包时无响应

2020-08-14 Thread Shiyuan L
通过Flink web ui上传jar包时链接被重置,不清楚是什么原因,请问各位大佬遇到过么?
[image: pic_2020-08-15_10-39-37.png]
[image: pic_2020-08-15_10-40-09.png]


Request for removal from subscription

2019-12-10 Thread L Jainkeri, Suman (Nokia - IN/Bangalore)
Unsubscribe


Re: Re: 怎样把 state 定时写到外部存储

2019-10-31 Thread misaki L
使用 window 聚合一下批量写呢?

wangl...@geekplus.com.cn  于2019年11月1日周五 上午10:17写道:

> Hi Congxian,
>
> 以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。
> 我们的 case 是写到 MySQL 中
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Congxian Qiu
> Send Time: 2019-11-01 10:10
> Receiver: user-zh
> Subject: Re: 怎样把 state 定时写到外部存储
> 好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢?
>
> Best,
> Congxian
>
>
> Jun Zhang <825875...@qq.com> 于2019年10月31日周四 上午10:36写道:
>
> > 是否可以注册一个定时器?
> >
> >
> > 你看看这个文章,是否对你有帮助
> >
> >
> > https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
> >  在2019年10月31日 10:16,wangl...@geekplus.com.cn<
> wangl...@geekplus.com.cn
> > 写道:
> >
> >
> > 消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
> > 有没有什么方式可以定期读 state 写到外部存储?
> > 我现在用的是 Flink1.7.2 版本。
> >
> >
> >
> >
> >
> > wangl...@geekplus.com.cn
>


Re: [Discuss] What should the "Data Source" be translated into Chinese

2019-08-13 Thread misaki L
我也更倾向于不翻译 Data Source 和 Data Sink

Kurt Young  于2019年8月13日周二 下午6:21写道:

> "更倾向不去翻译Data Source和Data Sink, 通过用中文对其做解释即可"   +1
>
> Best,
> Kurt
>
>
> On Tue, Aug 13, 2019 at 6:18 PM Simon Su  wrote:
>
> > 更倾向不去翻译Data Source和Data Sink, 通过用中文对其做解释即可
> >
> >
> > Thanks,
> > SImon
> >
> >
> > On 08/13/2019 18:07, wrote:
> > How about translate  "data sink" into “数据漕”
> > 漕,读作:cáo。汉字基本字义指通过水道运输粮食:漕运|漕粮。==>
> > https://baike.baidu.com/item/%E6%BC%95?forcehttps=1%3Ffr%3Dkg_hanyu
> >
> >
> >
> > - 原始邮件 -
> > 发件人:Kurt Young 
> > 收件人:dev , user-zh 
> > 主题:Re: [Discuss] What should the "Data Source" be translated into Chinese
> > 日期:2019年08月13日 16点44分
> >
> > cc user-zh mailing list, since there are lots of chinese speaking people.
> > Best,
> > Kurt
> > On Tue, Aug 13, 2019 at 4:02 PM WangHengwei  wrote:
> > Hi all,
> >
> >
> > I'm working on [FLINK-13405] Translate "Basic API Concepts" page into
> > Chinese. I have a problem.
> >
> > Usually we translate "Data Source" into "数据源" but there is no agreed
> > translation for "Data Sink". Since it often appears in documents, I think
> > we'd better to have a unified translation. I have some alternatives, e.g.
> > "数据沉淀","数据归" or "数据终".
> >
> > Committer Xingcan Cui has a good suggestion for "数据汇" which
> > corresponds to source ("数据源"). I asked Committer Jark Wu, he is also fine
> > with it. I think "数据汇" is a good representation of flow charactiristics
> so
> > I would like to use it.
> >
> >
> > I want to hear more thoughts from the community whether we should
> > translate it and what it should be translated into.
> >
> >
> > Thanks,
> > WangHW
> >
>


Query - External SSL setup

2019-04-25 Thread L Jainkeri, Suman (Nokia - IN/Bangalore)
Hi,

I am trying to authenticate Flink using NGINX. In the document it is mentioned 
to deploy a "side car proxy", here is the link for the section of the document 
which I have referred to 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/security-ssl.html#external--rest-connectivity
I see the Flink users have already deployed the proxies like Envoy 
Proxy or NGINX with MOD_AUTH.
Please can I get the code for how the NGINX proxy has been used for SSL setup.

Thank you & regards,
Suman



Flink window operation based on event time is triggered when watermark is less than the end of window ends

2018-11-29 Thread X L
Please refer to the stackoverflow
.
Thanks.

-- 
Thanks.

·
Lx
wlxwol...@gmail.com


Re: Duplicates in self join

2018-10-09 Thread Eric L Goodman
Interval join is exactly what I'm looking for.  Thanks for pointing it out!

On Mon, Oct 8, 2018 at 9:13 AM Fabian Hueske  wrote:

> Did you check the new interval join that was added with Flink 1.6.0 [1]?
> It might be better suited because, each record has its own boundaries
> based on its timestamp and the join window interval.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#interval-join
>
> Am Mo., 8. Okt. 2018 um 16:44 Uhr schrieb Eric L Goodman <
> eric.good...@colorado.edu>:
>
>> If I change it to a Tumbling window some of the results will be lost
>> since the pattern I'm matching has a temporal extent, so if the pattern
>> starts in one tumbling window and ends in the next, it won't be reported.
>> Based on the temporal length of the query, you can set the sliding window
>> and the window lengths to capture all the patterns, though as you note, you
>> will get duplicates.
>>
>> On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng  wrote:
>>
>>> Hi Eric,
>>>
>>> Can you change Sliding window to Tumbling window? The data of different
>>> sliding window are likely overlap.
>>>
>>> Best, Hequn
>>>
>>> On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński 
>>> wrote:
>>>
>>>> Hey,
>>>> IMHO, the simplest way in your case would be to use the Evictor to
>>>> evict duplicate values after the window is generated. Have look at it here:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html
>>>>
>>>> Best Regards,
>>>> Dominik.
>>>>
>>>> pon., 8 paź 2018 o 08:00 Eric L Goodman 
>>>> napisał(a):
>>>>
>>>>> What is the best way to avoid or remove duplicates when joining a
>>>>> stream with itself?  I'm performing a streaming temporal triangle
>>>>> computation and the first part is to find triads of two edges of the form
>>>>> vertexA->vertexB and vertexB->vertexC (and there are temporal constraints
>>>>> where the first edge occurs before the second edge).  To do that, I have
>>>>> the following code:
>>>>>
>>>>> DataStream triads = edges.join(edges)
>>>>> .where(new DestKeySelector())
>>>>> .equalTo(new SourceKeySelector())
>>>>> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
>>>>> Time.milliseconds(slideSizeMs)))
>>>>> .apply(new EdgeJoiner(queryWindow));
>>>>>
>>>>> However, when I look at the triads being built, there are two copies of 
>>>>> each triad.
>>>>>
>>>>> For example, if I create ten edges (time, source, target):
>>>>>
>>>>> 0.0, 4, 0
>>>>>
>>>>> 0.01, 1, 5
>>>>>
>>>>> 0.02, 3, 7
>>>>>
>>>>> 0.03, 0, 8
>>>>>
>>>>> 0.04, 0, 9
>>>>>
>>>>> 0.05, 4, 8
>>>>>
>>>>> 0.06, 4, 3
>>>>>
>>>>> 0.07, 5, 9
>>>>>
>>>>> 0.08, 7, 1
>>>>>
>>>>> 0.09, 9, 6
>>>>>
>>>>>
>>>>> It creates the following triads (time1, source1, target1, time2,
>>>>> source2, targe2). Note there are two copies of each.
>>>>>
>>>>> 0.0, 4, 0 0.03, 0, 8
>>>>>
>>>>> 0.0, 4, 0 0.03, 0, 8
>>>>>
>>>>> 0.0, 4, 0 0.04, 0, 9
>>>>>
>>>>> 0.0, 4, 0 0.04, 0, 9
>>>>>
>>>>> 0.01, 1, 5 0.07, 5, 9
>>>>>
>>>>> 0.01, 1, 5 0.07, 5, 9
>>>>>
>>>>> 0.02, 3, 7 0.08, 7, 1
>>>>>
>>>>> 0.02, 3, 7 0.08, 7, 1
>>>>>
>>>>> 0.04, 0, 9 0.09, 9, 6
>>>>>
>>>>> 0.04, 0, 9 0.09, 9, 6
>>>>>
>>>>> 0.07, 5, 9 0.09, 9, 6
>>>>>
>>>>> 0.07, 5, 9 0.09, 9, 6
>>>>>
>>>>> I'm assuming this behavior has something to do with the joining of 
>>>>> "edges" with itself.
>>>>>
>>>>> I can provide more code if that would be helpful, but I believe I've 
>>>>> captured the most salient portion.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>


Re: Duplicates in self join

2018-10-09 Thread Eric L Goodman
When I switched to using TumblingEventTimeWindows, it did remove the
duplicates, which was somewhat surprising because with just 10 edges (.1
seconds in length), it should have fit within one window of the
SlidingEventTimeWindows (20 seconds window, 10 second slide).

On Mon, Oct 8, 2018 at 9:02 AM Hequn Cheng  wrote:

> Hi,
>
> I just want to verify my assumption that the duplicates are introduced by
> the sliding window instead of the join. When perform a Sliding window, a
> message can belong to  multi windows and the message will be joined multi
> times.
> If my assumption is correct, you can add a ProcessFunction after the join
> to do distinct.
>
> Best, Hequn
>
> On Mon, Oct 8, 2018 at 10:37 PM Eric L Goodman 
> wrote:
>
>> If I change it to a Tumbling window some of the results will be lost
>> since the pattern I'm matching has a temporal extent, so if the pattern
>> starts in one tumbling window and ends in the next, it won't be reported.
>> Based on the temporal length of the query, you can set the sliding window
>> and the window lengths to capture all the patterns, though as you note, you
>> will get duplicates.
>>
>> On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng  wrote:
>>
>>> Hi Eric,
>>>
>>> Can you change Sliding window to Tumbling window? The data of different
>>> sliding window are likely overlap.
>>>
>>> Best, Hequn
>>>
>>> On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński 
>>> wrote:
>>>
>>>> Hey,
>>>> IMHO, the simplest way in your case would be to use the Evictor to
>>>> evict duplicate values after the window is generated. Have look at it here:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html
>>>>
>>>> Best Regards,
>>>> Dominik.
>>>>
>>>> pon., 8 paź 2018 o 08:00 Eric L Goodman 
>>>> napisał(a):
>>>>
>>>>> What is the best way to avoid or remove duplicates when joining a
>>>>> stream with itself?  I'm performing a streaming temporal triangle
>>>>> computation and the first part is to find triads of two edges of the form
>>>>> vertexA->vertexB and vertexB->vertexC (and there are temporal constraints
>>>>> where the first edge occurs before the second edge).  To do that, I have
>>>>> the following code:
>>>>>
>>>>> DataStream triads = edges.join(edges)
>>>>> .where(new DestKeySelector())
>>>>> .equalTo(new SourceKeySelector())
>>>>> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
>>>>> Time.milliseconds(slideSizeMs)))
>>>>> .apply(new EdgeJoiner(queryWindow));
>>>>>
>>>>> However, when I look at the triads being built, there are two copies of 
>>>>> each triad.
>>>>>
>>>>> For example, if I create ten edges (time, source, target):
>>>>>
>>>>> 0.0, 4, 0
>>>>>
>>>>> 0.01, 1, 5
>>>>>
>>>>> 0.02, 3, 7
>>>>>
>>>>> 0.03, 0, 8
>>>>>
>>>>> 0.04, 0, 9
>>>>>
>>>>> 0.05, 4, 8
>>>>>
>>>>> 0.06, 4, 3
>>>>>
>>>>> 0.07, 5, 9
>>>>>
>>>>> 0.08, 7, 1
>>>>>
>>>>> 0.09, 9, 6
>>>>>
>>>>>
>>>>> It creates the following triads (time1, source1, target1, time2,
>>>>> source2, targe2). Note there are two copies of each.
>>>>>
>>>>> 0.0, 4, 0 0.03, 0, 8
>>>>>
>>>>> 0.0, 4, 0 0.03, 0, 8
>>>>>
>>>>> 0.0, 4, 0 0.04, 0, 9
>>>>>
>>>>> 0.0, 4, 0 0.04, 0, 9
>>>>>
>>>>> 0.01, 1, 5 0.07, 5, 9
>>>>>
>>>>> 0.01, 1, 5 0.07, 5, 9
>>>>>
>>>>> 0.02, 3, 7 0.08, 7, 1
>>>>>
>>>>> 0.02, 3, 7 0.08, 7, 1
>>>>>
>>>>> 0.04, 0, 9 0.09, 9, 6
>>>>>
>>>>> 0.04, 0, 9 0.09, 9, 6
>>>>>
>>>>> 0.07, 5, 9 0.09, 9, 6
>>>>>
>>>>> 0.07, 5, 9 0.09, 9, 6
>>>>>
>>>>> I'm assuming this behavior has something to do with the joining of 
>>>>> "edges" with itself.
>>>>>
>>>>> I can provide more code if that would be helpful, but I believe I've 
>>>>> captured the most salient portion.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>


Re: Duplicates in self join

2018-10-08 Thread Eric L Goodman
If I change it to a Tumbling window some of the results will be lost since
the pattern I'm matching has a temporal extent, so if the pattern starts in
one tumbling window and ends in the next, it won't be reported.  Based on
the temporal length of the query, you can set the sliding window and the
window lengths to capture all the patterns, though as you note, you will
get duplicates.

On Mon, Oct 8, 2018 at 7:46 AM Hequn Cheng  wrote:

> Hi Eric,
>
> Can you change Sliding window to Tumbling window? The data of different
> sliding window are likely overlap.
>
> Best, Hequn
>
> On Mon, Oct 8, 2018 at 3:35 PM Dominik Wosiński  wrote:
>
>> Hey,
>> IMHO, the simplest way in your case would be to use the Evictor to evict
>> duplicate values after the window is generated. Have look at it here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/api/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.html
>>
>> Best Regards,
>> Dominik.
>>
>> pon., 8 paź 2018 o 08:00 Eric L Goodman 
>> napisał(a):
>>
>>> What is the best way to avoid or remove duplicates when joining a stream
>>> with itself?  I'm performing a streaming temporal triangle computation and
>>> the first part is to find triads of two edges of the form vertexA->vertexB
>>> and vertexB->vertexC (and there are temporal constraints where the first
>>> edge occurs before the second edge).  To do that, I have the following code:
>>>
>>> DataStream triads = edges.join(edges)
>>> .where(new DestKeySelector())
>>> .equalTo(new SourceKeySelector())
>>> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
>>> Time.milliseconds(slideSizeMs)))
>>> .apply(new EdgeJoiner(queryWindow));
>>>
>>> However, when I look at the triads being built, there are two copies of 
>>> each triad.
>>>
>>> For example, if I create ten edges (time, source, target):
>>>
>>> 0.0, 4, 0
>>>
>>> 0.01, 1, 5
>>>
>>> 0.02, 3, 7
>>>
>>> 0.03, 0, 8
>>>
>>> 0.04, 0, 9
>>>
>>> 0.05, 4, 8
>>>
>>> 0.06, 4, 3
>>>
>>> 0.07, 5, 9
>>>
>>> 0.08, 7, 1
>>>
>>> 0.09, 9, 6
>>>
>>>
>>> It creates the following triads (time1, source1, target1, time2,
>>> source2, targe2). Note there are two copies of each.
>>>
>>> 0.0, 4, 0 0.03, 0, 8
>>>
>>> 0.0, 4, 0 0.03, 0, 8
>>>
>>> 0.0, 4, 0 0.04, 0, 9
>>>
>>> 0.0, 4, 0 0.04, 0, 9
>>>
>>> 0.01, 1, 5 0.07, 5, 9
>>>
>>> 0.01, 1, 5 0.07, 5, 9
>>>
>>> 0.02, 3, 7 0.08, 7, 1
>>>
>>> 0.02, 3, 7 0.08, 7, 1
>>>
>>> 0.04, 0, 9 0.09, 9, 6
>>>
>>> 0.04, 0, 9 0.09, 9, 6
>>>
>>> 0.07, 5, 9 0.09, 9, 6
>>>
>>> 0.07, 5, 9 0.09, 9, 6
>>>
>>> I'm assuming this behavior has something to do with the joining of "edges" 
>>> with itself.
>>>
>>> I can provide more code if that would be helpful, but I believe I've 
>>> captured the most salient portion.
>>>
>>>
>>>
>>>
>>>
>>>


Duplicates in self join

2018-10-08 Thread Eric L Goodman
What is the best way to avoid or remove duplicates when joining a stream
with itself?  I'm performing a streaming temporal triangle computation and
the first part is to find triads of two edges of the form vertexA->vertexB
and vertexB->vertexC (and there are temporal constraints where the first
edge occurs before the second edge).  To do that, I have the following code:

DataStream triads = edges.join(edges)
.where(new DestKeySelector())
.equalTo(new SourceKeySelector())
.window(SlidingEventTimeWindows.of(Time.milliseconds(windowSizeMs),
Time.milliseconds(slideSizeMs)))
.apply(new EdgeJoiner(queryWindow));

However, when I look at the triads being built, there are two copies
of each triad.

For example, if I create ten edges (time, source, target):

0.0, 4, 0

0.01, 1, 5

0.02, 3, 7

0.03, 0, 8

0.04, 0, 9

0.05, 4, 8

0.06, 4, 3

0.07, 5, 9

0.08, 7, 1

0.09, 9, 6


It creates the following triads (time1, source1, target1, time2, source2,
targe2). Note there are two copies of each.

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.03, 0, 8

0.0, 4, 0 0.04, 0, 9

0.0, 4, 0 0.04, 0, 9

0.01, 1, 5 0.07, 5, 9

0.01, 1, 5 0.07, 5, 9

0.02, 3, 7 0.08, 7, 1

0.02, 3, 7 0.08, 7, 1

0.04, 0, 9 0.09, 9, 6

0.04, 0, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

0.07, 5, 9 0.09, 9, 6

I'm assuming this behavior has something to do with the joining of
"edges" with itself.

I can provide more code if that would be helpful, but I believe I've
captured the most salient portion.


multiple input streams

2018-08-31 Thread Eric L Goodman
If I have a standalone cluster running flink, what is the best way to
ingest multiple streams of the same type of data?

For example, if I open a socket text stream, does the socket only get
opened on the master node and then the stream is partitioned out to the
worker nodes?
 DataStream text = env.socketTextStream("localhost", port, "\n");

Is it possible to have each worker node be a sensor that receives a stream
of data, where each stream is of the same type (e.g. a series of tuples)?

Thanks