No. of task managers vs No. of task slots

2023-07-31 Thread Kamal Mittal via user
Hell Community,

Need info. for below -


  1.  How many task managers a job manager can handle? Is there any upper limit 
also?



  1.  How to decide no. of task managers, is there any way?



  1.  What is the difference between high no. of task managers vs high no. of 
task slots (with low no. of task managers)?

Rgds,
Kamal


Re: Java 17 for Flink 1.17 supported?

2023-07-31 Thread xiangyu feng
Hi Patricia,

JDK17 will be supported in 1.18 release. See more in this jira[1].

[1] https://issues.apache.org/jira/browse/FLINK-15736

Best,
Xiangyu


patricia lee  于2023年7月31日周一 16:25写道:

> Hi,
>
> I was advised to upgrade the JDK of our flink 1.7 to 17. However, in the
> documeation it only says in bold "Java 11".
>
> Is java 17 support will starton flink 1.18 release?
>
> Thanks
>


RE: Flink netty connector for TCP source

2023-07-31 Thread Kamal Mittal via user
Hello,

I created a custom server socket source in which opened server socket in split 
enumerator, source parallelism is =2 and it gives error while running - 
“Address is already in use” i.e. it tried to open two server sockets as per 
parallelism which is not correct as parallelism is applicable for source 
operator and not for split enumerator?

Please correct me if above understanding is not correct.

Rgds,
Kamal

From: Hang Ruan 
Sent: 01 August 2023 08:55 AM
To: Kamal Mittal 
Cc: liu ron ; user@flink.apache.org
Subject: Re: Flink netty connector for TCP source

Hi, Kamal.

The SplitEnumerator is contained in the SourceCoordinator. They are only used 
in JM.

Best,
Hang

Kamal Mittal via user mailto:user@flink.apache.org>> 
于2023年8月1日周二 10:43写道:
Thanks.

I looked at the link for custom data sources, one query here that how to make 
sure for Split enumerator to execute on Job Manager rather than at Task manager?

From: liu ron mailto:ron9@gmail.com>>
Sent: 31 July 2023 10:06 PM
To: user@flink.apache.org
Subject: Re: Flink netty connector for TCP source

Hi, Kamal
Currently, we don't provide the connector like TCP  source in Flink main repo. 
If you need this connector, you can try to implement it refer to the FLIP-27 
source docs

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

Best,
Ron

Shammon FY mailto:zjur...@gmail.com>> 于2023年7月27日周四 11:23写道:
I cannot find any information about netty source in flink website and it is not 
in the connector list[1], so I'm think that it is not supported by flink 
community

[1]  
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/

Best,
Shammon FY

On Thu, Jul 27, 2023 at 10:53 AM Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello Shammon,

Yes socket text stream I am aware of but was thinking if something like as 
‘https://github.com/apache/bahir-flink/tree/master/flink-connector-netty’
 is also supported by Flink?

Rgds,
Kamal

From: Shammon FY mailto:zjur...@gmail.com>>
Sent: 27 July 2023 08:15 AM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Flink netty connector for TCP source

Hi Kamal,

There's socket text stream in `DataStream` and you can refer to [1] for more 
details.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program

Best,
Shammon FY

On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Does flink provides netty connector for custom TCP source?

Any documentation details please share?

Rgds,
Kamal


Re: Kafka Exception

2023-07-31 Thread Hang Ruan
Hi, Kenan.

Maybe you should set the `client.id.prefix` to avoid the conflict.

Best,
Hang

liu ron  于2023年7月31日周一 19:36写道:

> Hi, Kenan
>
> After studying the source code and searching google for related
> information, I think this should be caused by duplicate client_id [1], you
> can check if there are other jobs using the same group_id in consuming this
> topic. group_id is used in Flink to assemble client_id [2], if there are
> already jobs using the same group _id, the duplicated client_id will be
> detected on the Kafka side.
>
> [1]
> https://stackoverflow.com/questions/40880832/instancealreadyexistsexception-coming-from-kafka-consumer
> [2]
> https://github.com/apache/flink-connector-kafka/blob/79ae2d70499f81ce489911956c675354657dd44f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L466
>
> Best,
> Ron
>
> Kenan Kılıçtepe  于2023年7月25日周二 21:48写道:
>
>>
>>
>>
>>
>> Any help is appreciated about the exception below.
>> Also my Kafkasource code is below. The parallelism is 16 for this task.
>>
>> KafkaSource sourceStationsPeriodic = KafkaSource.
>> builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_WIFI))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>>
>> .build();
>> // Our Kafka Source
>> KafkaSource sourceStationsWifiInterface =
>> KafkaSource.builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_WIFI_INTERFACE))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>> .build();
>> KafkaSource sourceTwinMessage = KafkaSource.<
>> String>builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_TWIN_MESSAGE))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>> .build();
>>
>> KafkaSource sourceStationsOnDemand = KafkaSource.
>> builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_STATIONS_ON_DEMAND))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>> .build();
>>
>> KafkaSource sourceDeviceInfo = KafkaSource.<
>> String>builder()
>> .setBootstrapServers(parameter.get(
>> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>> .setTopics(parameter.get(
>> KAFKA_TOPIC_READ_DEVICE_INFO))
>> .setGroupId(parameter.get(KAFKA_GROUP))
>> .setStartingOffsets(OffsetsInitializer.
>> latest())
>> .setValueOnlyDeserializer(new
>> SimpleStringSchema())
>> .build();
>>
>>
>>
>> 2023-07-23 07:06:24,927 WARN  org.apache.kafka.common.utils.AppInfoParser
>>  [] - Error registering AppInfo mbean
>> javax.management.InstanceAlreadyExistsException:
>> kafka.admin.client:type=app-info,id=wifialgogroup1-enumerator-admin-client
>> at
>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
>> ~[?:?]
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
>> ~[?:?]
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
>> ~[?:?]
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320

Re: Flink netty connector for TCP source

2023-07-31 Thread Hang Ruan
Hi, Kamal.

The SplitEnumerator is contained in the SourceCoordinator. They are only
used in JM.

Best,
Hang

Kamal Mittal via user  于2023年8月1日周二 10:43写道:

> Thanks.
>
>
>
> I looked at the link for custom data sources, one query here that how to
> make sure for Split enumerator to execute on Job Manager rather than at
> Task manager?
>
>
>
> *From:* liu ron 
> *Sent:* 31 July 2023 10:06 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal
>
> Currently, we don't provide the connector like TCP  source in Flink main
> repo. If you need this connector, you can try to implement it refer to the
> FLIP-27 source docs
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
>
>
>
> Best,
>
> Ron
>
>
>
> Shammon FY  于2023年7月27日周四 11:23写道:
>
> I cannot find any information about netty source in flink website and it
> is not in the connector list[1], so I'm think that it is not supported by
> flink community
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Thu, Jul 27, 2023 at 10:53 AM Kamal Mittal 
> wrote:
>
> Hello Shammon,
>
>
>
> Yes socket text stream I am aware of but was thinking if something like as
> ‘https://github.com/apache/bahir-flink/tree/master/flink-connector-netty
> ’
> is also supported by Flink?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Shammon FY 
> *Sent:* 27 July 2023 08:15 AM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi Kamal,
>
>
>
> There's socket text stream in `DataStream` and you can refer to [1] for
> more details.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Does flink provides netty connector for custom TCP source?
>
>
>
> Any documentation details please share?
>
>
>
> Rgds,
>
> Kamal
>
>


RE: Flink netty connector for TCP source

2023-07-31 Thread Kamal Mittal via user
Thanks.

I looked at the link for custom data sources, one query here that how to make 
sure for Split enumerator to execute on Job Manager rather than at Task manager?

From: liu ron 
Sent: 31 July 2023 10:06 PM
To: user@flink.apache.org
Subject: Re: Flink netty connector for TCP source

Hi, Kamal
Currently, we don't provide the connector like TCP  source in Flink main repo. 
If you need this connector, you can try to implement it refer to the FLIP-27 
source docs

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

Best,
Ron

Shammon FY mailto:zjur...@gmail.com>> 于2023年7月27日周四 11:23写道:
I cannot find any information about netty source in flink website and it is not 
in the connector list[1], so I'm think that it is not supported by flink 
community

[1]  
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/

Best,
Shammon FY

On Thu, Jul 27, 2023 at 10:53 AM Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello Shammon,

Yes socket text stream I am aware of but was thinking if something like as 
‘https://github.com/apache/bahir-flink/tree/master/flink-connector-netty’
 is also supported by Flink?

Rgds,
Kamal

From: Shammon FY mailto:zjur...@gmail.com>>
Sent: 27 July 2023 08:15 AM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: Flink netty connector for TCP source

Hi Kamal,

There's socket text stream in `DataStream` and you can refer to [1] for more 
details.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program

Best,
Shammon FY

On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Does flink provides netty connector for custom TCP source?

Any documentation details please share?

Rgds,
Kamal


Investigating use of Custom Trigger to smooth CPU usage

2023-07-31 Thread Tucker Harvey via user
Hello Flink community! My team is trying to optimize CPU usage on a running 
job, and we're exploring the option of offsetting the trigger time for smoother 
CPU patterns. Since adjusting the window will compromise job correctness, we 
plan to pursue a custom trigger implementation. We were curious if the 
community had any thoughts or insights on this issue.




Re: Parallelism under reactive scaling with slot sharing groups

2023-07-31 Thread Allen Wang
We were using Flink 1.16. I did some research and it looks like it is fixed
 in Flink 1.17.



On Thu, Jul 27, 2023 at 9:43 PM Weihua Hu  wrote:

> Hi,
>
> Which Flink version are you using? I haven't reproduced this issue by
> using the master branch.
>
>
> Best,
> Weihua
>
>
> On Tue, Jul 25, 2023 at 2:56 AM Allen Wang  wrote:
>
>> Hello,
>>
>> Our job has operators of source -> sink -> global committer. We have
>> created two slot sharing groups, one for source and sink and one for global
>> committer. The global committer has specified max parallelism of 1. No max
>> parallelism set with the source/sink while there is a system level default
>> parallelism set to be the total number of task slots.
>>
>> After we enabled reactive mode scaling, we found that the parallelism for
>> source/sink calculated by the adaptive scheduler is only half of the total
>> task slots. For example, if we have 8 slots, the derived parallelism is 4
>> meaning source/sink would use only 4 slots, and the global committer uses 1
>> slot. So we have 3 idle slots which is a waste.
>>
>> Any ideas how we can avoid those idle slots in this setting?
>>
>> Thanks,
>> Allen
>>
>>
>>
>>


Re: Unsubscribe

2023-07-31 Thread liu ron
Hi,

Please send an email to user-unsubscr...@flink.apache.org in order to be
removed from the User mailing list.

Best,
Ron

yunchao li  于2023年7月17日周一 09:14写道:

> Unsubscribe --
>
>>


Re: Unsubscribe

2023-07-31 Thread liu ron
Hi, wang

Please send an email to user-unsubscr...@flink.apache.org in order to be
removed from the User mailing list.

Best,
Ron


wang <24248...@163.com> 于2023年7月17日周一 23:10写道:

> Unsubscribe
>


Re: Unsubscribe

2023-07-31 Thread liu ron
Hi, William

Please send an email to user-unsubscr...@flink.apache.org in order to be
removed from the User mailing list.

Best,
Ron


William Wang  于2023年7月17日周一 14:59写道:

>
>


Re: Flink netty connector for TCP source

2023-07-31 Thread liu ron
Hi, Kamal
Currently, we don't provide the connector like TCP  source in Flink main
repo. If you need this connector, you can try to implement it refer to the
FLIP-27 source docs

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/

Best,
Ron

Shammon FY  于2023年7月27日周四 11:23写道:

> I cannot find any information about netty source in flink website and it
> is not in the connector list[1], so I'm think that it is not supported by
> flink community
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/
>
> Best,
> Shammon FY
>
> On Thu, Jul 27, 2023 at 10:53 AM Kamal Mittal 
> wrote:
>
>> Hello Shammon,
>>
>>
>>
>> Yes socket text stream I am aware of but was thinking if something like
>> as ‘
>> https://github.com/apache/bahir-flink/tree/master/flink-connector-netty’
>> is also supported by Flink?
>>
>>
>>
>> Rgds,
>>
>> Kamal
>>
>>
>>
>> *From:* Shammon FY 
>> *Sent:* 27 July 2023 08:15 AM
>> *To:* Kamal Mittal 
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Flink netty connector for TCP source
>>
>>
>>
>> Hi Kamal,
>>
>>
>>
>> There's socket text stream in `DataStream` and you can refer to [1] for
>> more details.
>>
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program
>>
>>
>>
>> Best,
>>
>> Shammon FY
>>
>>
>>
>> On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user <
>> user@flink.apache.org> wrote:
>>
>> Hello,
>>
>>
>>
>> Does flink provides netty connector for custom TCP source?
>>
>>
>>
>> Any documentation details please share?
>>
>>
>>
>> Rgds,
>>
>> Kamal
>>
>>


Re: Metrics are not reported in Python UDF (used inside FlinkSQL) when exception is raised

2023-07-31 Thread liu ron
Hi, Elkhan

I have one question, what metrics are not reported?

Best,
Ron

Elkhan Dadashov  于2023年7月28日周五 05:46写道:

> Hi Flinkers,
>
> Wanted to check if anyone else has faced this issue before:
>
> When Python UDF (which is used inside FlinkSQL) raises an exception, then
> metrics get lost and not reported.  Facing this issue both in Flink 1.16.2
> and FLink 1.17.1 (Python 3.9).
> If an exception is not raised, then metrics show up.
>
> It is not mentioned on Flink documentation that UDFs should not throw an
> exception. Is this the case?
>
> Or is it a known issue/bug?
>
> Thank you.
>
> ===
> FlinkSQL script content:
> ===
>
> CREATE TABLE input_table (
> price DOUBLE
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '1'
> );
>
> CREATE TABLE output_table WITH ('connector' = 'print')
> LIKE input_table (EXCLUDING ALL);
>
> CREATE FUNCTION myDivide AS 'custom_udf.divide_udf'
> LANGUAGE PYTHON;
>
> -- Fail scenario: ZeroDivisionError: division by zero
> INSERT into output_table (select myDivide(value, 0)  from input_table);
>
> ===
> Python UDF content:
> ===
>
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes
>
> import logging
>
> class DivideUDF(ScalarFunction):
> def __init__(self):
> self.success_counter = None
> self.fail_counter = None
> def open(self, function_context):
> self.success_counter =
> function_context.get_metric_group().counter("flinksql_custom_udf_success_metric")
> self.fail_counter =
> function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric")
> def eval(self, x, y):
> logging.info('executing custom udf with logging and metric
> example...')
> try:
> result = x/y
> self.success_counter.inc()
> return result
> except Exception as e:
> self.fail_counter.inc()
> raise e
>
> divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE())
>
> ===
> Exception stack trace:
> ===
>
> 2023-07-26 18:17:20
> org.apache.flink.runtime.taskmanager.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: TimerException{java.lang.RuntimeException: Error while waiting
> for BeamPythonFunctionRunner flush}
> ... 15 more
> Caused by: java.lang.RuntimeException: Error while waiting for
> BeamPythonFunctionRunner flush
> at
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702)
> ... 14 more
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
> at
> org.apache.flink.streaming.api.runners.py

Re: Flink sql client doesn't work with "partition by" clause

2023-07-31 Thread liu ron
Hi, dongwoo

You can check the SqlClient log first, I think the exception has been
logged in SqlClient log file.

Best,
Ron

Dongwoo Kim  于2023年7月31日周一 22:38写道:

>
>
> -- Forwarded message -
> 보낸사람: Dongwoo Kim 
> Date: 2023년 7월 31일 (월) 오후 11:36
> Subject: Re: Flink sql client doesn't work with "partition by" clause
> To: liu ron 
>
>
> Hi, ron.
>
> Actually I'm not receiving any exception message when executing the *partition
> by* clause in the Flink SQL Client.
>
> The job does not fail, but it finishes quickly without executing the
> expected query job.
>
> I'm suspecting that the Flink SQL Client is not recognizing the partition
> by field(*`hour`*) properly.
>
> This is because when I input an obviously incorrect field(*hourr*) as the
> partition by field, the job behaves in the same manner - it does not fail,
> but also doesn't perform any operations(reading file ) and ends the query.
>
>
> *e.g) *This below query also doesn't fail but ends right after submission.
>
> CREATE TABLE source_table
> (
> id STRING,
> status STRING,
> type   STRING,
> hourr INT
> ) PARTITIONED BY (`hourr`) WITH (
>   'connector' = 'filesystem',
>   'path' = 'hdfs://${our_data_path}month=202307/day=20230714',
>   'format' = 'parquet'
>   );
>
> SELECT hourr
> FROM source_table
> GROUP BY hourr;
>
>
> To provide better context
> I was using this sql-runner
> 
> to test the partition by clause and it worked as expected, but didn't work
> when I deployed the flink-session cluster by flink kubernetes operator and
> executed sql-client.sh in the jobmanger pod.
>
> Since as you said this is not an expected behavior, I'll investigate the
> source code.
>
> Thanks
>
>
> Best,
> dongwoo
>
>
>
> 2023년 7월 31일 (월) 오후 5:47, liu ron 님이 작성:
>
>> Hi, dongwoo
>>
>> Can you give the exception message about SqlClient, it would be helpful
>> to find the root cause. In theory, it should work for both cases.
>>
>> Best,
>> Ron
>>
>> Dongwoo Kim  于2023年7月28日周五 21:24写道:
>>
>>> Hello all, I've realized that the previous mail had some error which
>>> caused invisible text. So I'm resending the mail below.
>>>
>>> Hello all, I have found that the Flink sql client doesn't work with the 
>>> *"partition
>>> by"* clause.
>>> Is this a bug?
>>> It's a bit weird since when I execute the same sql with
>>> *"tableEnv.executeSql(statement)"* code it works as expected.
>>> Has anyone tackled this kind of issue?
>>> I have tested in flink 1.16.1 version.
>>>
>>> Thanks in advance
>>>
>>>
>>>
>>> *- This below code only works with executeSql method in table api but
>>> not with sql client cli.*
>>>
>>> CREATE TABLE source_table
>>> (
>>> id STRING,
>>> status STRING,
>>> type   STRING,
>>> `hour` INT
>>> ) PARTITIONED BY (`hour`) WITH (
>>>   'connector' = 'filesystem',
>>>   'path' = 'hdfs://${our_data_path}month=202307/day=20230714',
>>>   'format' = 'parquet'
>>>   );
>>>
>>> SELECT `hour`
>>> FROM source_table
>>> GROUP BY `hour`;
>>>
>>>
>>> *- This below query works both on the executeSql() method in table api
>>> and sql client query.*
>>>
>>> CREATE TABLE source_table_2
>>> (
>>> id   STRING,
>>> status   STRING,
>>> type STRING
>>> ) WITH (
>>>   'connector' = 'filesystem',
>>>   'path' = 'hdfs://${out_data_path}/month=202307/day=20230714',
>>>   'format' = 'parquet'
>>>   );
>>>
>>> SELECT status
>>> FROM source_table_2
>>> GROUP BY status;
>>>
>>>
>>>
>>> Best,
>>> dongwoo
>>>
>>> 2023년 7월 28일 (금) 오후 6:19, Dongwoo Kim 님이 작성:
>>>
 Hello all, I have found that flink sql client doesn't work with
 "partition by" clause.
 Is this bug? It's bit weird since when I execute same sql with
 tableEnv.executeSql(statement) code it works as expected. Has anyone
 tackled this kind of issue? I have tested in flink 1.16.1 version.
 Thanks in advance


 - This below code only works with executeSql method in table api but
 not with sql client cli.

 CREATE TABLE source_table

 (

 id   STRING,

 status   STRING,

 type STRING,

 `hour`INT

 ) PARTITIONED BY (`hour`) WITH (

   'connector' = 'filesystem',

   'path' = 'hdfs://${our_data_path}month=202307/day=20230714',

   'format' = 'parquet'

   );


 SELECT `hour`

 FROM source_table

 GROUP BY `hour`;



 - This below query works both on executeSql method in table api and
 sql client query.


 CREATE TABLE source_table_2

 (

 id   STRING,

 status   STRING,

 type STRING

 ) WITH (

   'connector' = 'filesystem',

   'path' = 'hd

Fwd: Flink sql client doesn't work with "partition by" clause

2023-07-31 Thread Dongwoo Kim
-- Forwarded message -
보낸사람: Dongwoo Kim 
Date: 2023년 7월 31일 (월) 오후 11:36
Subject: Re: Flink sql client doesn't work with "partition by" clause
To: liu ron 


Hi, ron.

Actually I'm not receiving any exception message when executing the *partition
by* clause in the Flink SQL Client.

The job does not fail, but it finishes quickly without executing the
expected query job.

I'm suspecting that the Flink SQL Client is not recognizing the partition
by field(*`hour`*) properly.

This is because when I input an obviously incorrect field(*hourr*) as the
partition by field, the job behaves in the same manner - it does not fail,
but also doesn't perform any operations(reading file ) and ends the query.

*e.g) *This below query also doesn't fail but ends right after submission.

CREATE TABLE source_table
(
id STRING,
status STRING,
type   STRING,
hourr INT
) PARTITIONED BY (`hourr`) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://${our_data_path}month=202307/day=20230714',
  'format' = 'parquet'
  );

SELECT hourr
FROM source_table
GROUP BY hourr;


To provide better context
I was using this sql-runner

to test the partition by clause and it worked as expected, but didn't work
when I deployed the flink-session cluster by flink kubernetes operator and
executed sql-client.sh in the jobmanger pod.

Since as you said this is not an expected behavior, I'll investigate the
source code.

Thanks


Best,
dongwoo



2023년 7월 31일 (월) 오후 5:47, liu ron 님이 작성:

> Hi, dongwoo
>
> Can you give the exception message about SqlClient, it would be helpful to
> find the root cause. In theory, it should work for both cases.
>
> Best,
> Ron
>
> Dongwoo Kim  于2023年7月28日周五 21:24写道:
>
>> Hello all, I've realized that the previous mail had some error which
>> caused invisible text. So I'm resending the mail below.
>>
>> Hello all, I have found that the Flink sql client doesn't work with the 
>> *"partition
>> by"* clause.
>> Is this a bug?
>> It's a bit weird since when I execute the same sql with
>> *"tableEnv.executeSql(statement)"* code it works as expected.
>> Has anyone tackled this kind of issue?
>> I have tested in flink 1.16.1 version.
>>
>> Thanks in advance
>>
>>
>>
>> *- This below code only works with executeSql method in table api but not
>> with sql client cli.*
>>
>> CREATE TABLE source_table
>> (
>> id STRING,
>> status STRING,
>> type   STRING,
>> `hour` INT
>> ) PARTITIONED BY (`hour`) WITH (
>>   'connector' = 'filesystem',
>>   'path' = 'hdfs://${our_data_path}month=202307/day=20230714',
>>   'format' = 'parquet'
>>   );
>>
>> SELECT `hour`
>> FROM source_table
>> GROUP BY `hour`;
>>
>>
>> *- This below query works both on the executeSql() method in table api
>> and sql client query.*
>>
>> CREATE TABLE source_table_2
>> (
>> id   STRING,
>> status   STRING,
>> type STRING
>> ) WITH (
>>   'connector' = 'filesystem',
>>   'path' = 'hdfs://${out_data_path}/month=202307/day=20230714',
>>   'format' = 'parquet'
>>   );
>>
>> SELECT status
>> FROM source_table_2
>> GROUP BY status;
>>
>>
>>
>> Best,
>> dongwoo
>>
>> 2023년 7월 28일 (금) 오후 6:19, Dongwoo Kim 님이 작성:
>>
>>> Hello all, I have found that flink sql client doesn't work with
>>> "partition by" clause.
>>> Is this bug? It's bit weird since when I execute same sql with
>>> tableEnv.executeSql(statement) code it works as expected. Has anyone
>>> tackled this kind of issue? I have tested in flink 1.16.1 version.
>>> Thanks in advance
>>>
>>>
>>> - This below code only works with executeSql method in table api but
>>> not with sql client cli.
>>>
>>> CREATE TABLE source_table
>>>
>>> (
>>>
>>> id   STRING,
>>>
>>> status   STRING,
>>>
>>> type STRING,
>>>
>>> `hour`INT
>>>
>>> ) PARTITIONED BY (`hour`) WITH (
>>>
>>>   'connector' = 'filesystem',
>>>
>>>   'path' = 'hdfs://${our_data_path}month=202307/day=20230714',
>>>
>>>   'format' = 'parquet'
>>>
>>>   );
>>>
>>>
>>> SELECT `hour`
>>>
>>> FROM source_table
>>>
>>> GROUP BY `hour`;
>>>
>>>
>>>
>>> - This below query works both on executeSql method in table api and sql
>>> client query.
>>>
>>>
>>> CREATE TABLE source_table_2
>>>
>>> (
>>>
>>> id   STRING,
>>>
>>> status   STRING,
>>>
>>> type STRING
>>>
>>> ) WITH (
>>>
>>>   'connector' = 'filesystem',
>>>
>>>   'path' = 'hdfs://${out_data_path}/month=202307/day=20230714',
>>>
>>>   'format' = 'parquet'
>>>
>>>   );
>>>
>>>
>>> SELECT status
>>>
>>> FROM source_table_2
>>>
>>> GROUP BY status;
>>>
>>>
>>>
>>> Best,
>>>
>>> dongwoo
>>>
>>>
>>>


Re: Logs of Kafka payload in Table Api connector of Apache Flink

2023-07-31 Thread liu ron
Hi, elakiya

If you want to log the deserialize avro record, you should modify the code
in [1].

1.
https://github.com/apache/flink/blob/70e635983dc9aa4bd772042f34a04099256a8750/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java#L81

Best,
Ron



Hang Ruan  于2023年7月20日周四 19:53写道:

> Hi. elaloya.
>
> If you want to log some information about the kafka records, you can add
> some logs in KafkaRecordEmitter.
> If you want to know the information about the deserialized value, you
> should add logs in the avro format.
>
> Best,
> Hang
>
> elakiya udhayanan  于2023年7月19日周三 19:44写道:
>
>> Hi Team,
>>
>> I am using the upsert-kafka table API connector of Apache Flink to
>> consume events from a kafka topic, I want to log the kafka payloads that
>> are consumed. Is there a way to log it?
>>
>> My code looks as below:
>>
>> EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance().inStreamingMode().build();
>> TableEnvironment tEnv = TableEnvironment.create(settings);
>> String statement = "CREATE TABLE Employee (\r\n" +
>> "  employee  ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (employeeId) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" 
>> +
>> ")";
>> tEnv.executeSql(statement);
>>
>> I added log4j.properties to enable log but it did not work. Any help is
>> appreciated.
>>
>


Re: Kafka Exception

2023-07-31 Thread liu ron
Hi, Kenan

After studying the source code and searching google for related
information, I think this should be caused by duplicate client_id [1], you
can check if there are other jobs using the same group_id in consuming this
topic. group_id is used in Flink to assemble client_id [2], if there are
already jobs using the same group _id, the duplicated client_id will be
detected on the Kafka side.

[1]
https://stackoverflow.com/questions/40880832/instancealreadyexistsexception-coming-from-kafka-consumer
[2]
https://github.com/apache/flink-connector-kafka/blob/79ae2d70499f81ce489911956c675354657dd44f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L466

Best,
Ron

Kenan Kılıçtepe  于2023年7月25日周二 21:48写道:

>
>
>
>
> Any help is appreciated about the exception below.
> Also my Kafkasource code is below. The parallelism is 16 for this task.
>
> KafkaSource sourceStationsPeriodic = KafkaSource.<
> String>builder()
> .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
> .setTopics(parameter.get(
> KAFKA_TOPIC_READ_WIFI))
> .setGroupId(parameter.get(KAFKA_GROUP))
> .setStartingOffsets(OffsetsInitializer.
> latest())
> .setValueOnlyDeserializer(new
> SimpleStringSchema())
>
> .build();
> // Our Kafka Source
> KafkaSource sourceStationsWifiInterface =
> KafkaSource.builder()
> .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
> .setTopics(parameter.get(
> KAFKA_TOPIC_READ_WIFI_INTERFACE))
> .setGroupId(parameter.get(KAFKA_GROUP))
> .setStartingOffsets(OffsetsInitializer.
> latest())
> .setValueOnlyDeserializer(new
> SimpleStringSchema())
> .build();
> KafkaSource sourceTwinMessage = KafkaSource.<
> String>builder()
> .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
> .setTopics(parameter.get(
> KAFKA_TOPIC_READ_TWIN_MESSAGE))
> .setGroupId(parameter.get(KAFKA_GROUP))
> .setStartingOffsets(OffsetsInitializer.
> latest())
> .setValueOnlyDeserializer(new
> SimpleStringSchema())
> .build();
>
> KafkaSource sourceStationsOnDemand = KafkaSource.<
> String>builder()
> .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
> .setTopics(parameter.get(
> KAFKA_TOPIC_READ_STATIONS_ON_DEMAND))
> .setGroupId(parameter.get(KAFKA_GROUP))
> .setStartingOffsets(OffsetsInitializer.
> latest())
> .setValueOnlyDeserializer(new
> SimpleStringSchema())
> .build();
>
> KafkaSource sourceDeviceInfo = KafkaSource. >builder()
> .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
> .setTopics(parameter.get(
> KAFKA_TOPIC_READ_DEVICE_INFO))
> .setGroupId(parameter.get(KAFKA_GROUP))
> .setStartingOffsets(OffsetsInitializer.
> latest())
> .setValueOnlyDeserializer(new
> SimpleStringSchema())
> .build();
>
>
>
> 2023-07-23 07:06:24,927 WARN  org.apache.kafka.common.utils.AppInfoParser
>  [] - Error registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException:
> kafka.admin.client:type=app-info,id=wifialgogroup1-enumerator-admin-client
> at
> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
> ~[?:?]
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
> ~[?:?]
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
> ~[?:?]
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
> ~[?:?]
> at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> ~[?:?]
> at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
> ~[blob_p-7f823076a9b41619d082

Re: Suggestions for Open Source FLINK SQL editor

2023-07-31 Thread liu ron
Hi Rajat,

I also think Zeppelin may be a good choice.

Best,
Ron

Guanghui Zhang  于2023年7月28日周五 20:45写道:

> Hi, Guozhen, our team also use flink as ad-hoc query engine.  Can we talk
> aboat it
>
> Guozhen Yang  于2023年7月20日周四 11:58写道:
>
>> Hi Rajat,
>>
>> We are using apache zeppelin as our entry point for submitting flink
>> ad-hoc queries (and spark jobs actually).
>>
>> It supports interactive queries, data visualization, multiple data query
>> engines, multiple auth models. You can check out other features on its
>> official website.
>>
>> But because of the inactivity of the apache zeppelin community (the last
>> stable release was a year and a half ago), we need to do a bit of custom
>> development and bug fixing on its master branch.
>>
>> On 2023/07/19 16:47:43 Rajat Ahuja wrote:
>> > Hi team,
>> >
>> > I have set up a session cluster on k8s via sql gateway.  I am looking
>> for
>> > an open source Flink sql editor that can submit sql queries on top of
>> the
>> > k8s session cluster. Any suggestions for sql editor to submit queries ?
>> >
>> >
>> > Thanks
>> >
>>
>


Re: Java 17 for Flink 1.17 supported?

2023-07-31 Thread Tamir Sagi
Hey,

JDK 17 support Epic [1] is closed and seems to be delivered along with Flink 
1.18.

[1] https://issues.apache.org/jira/browse/FLINK-15736

From: patricia lee 
Sent: Monday, July 31, 2023 11:24 AM
To: user@flink.apache.org 
Subject: Java 17 for Flink 1.17 supported?


EXTERNAL EMAIL


Hi,

I was advised to upgrade the JDK of our flink 1.7 to 17. However, in the 
documeation it only says in bold "Java 11".

Is java 17 support will starton flink 1.18 release?

Thanks

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Java 17 for Flink 1.17 supported?

2023-07-31 Thread patricia lee
Hi,

I was advised to upgrade the JDK of our flink 1.7 to 17. However, in the
documeation it only says in bold "Java 11".

Is java 17 support will starton flink 1.18 release?

Thanks