[jira] [Created] (FLINK-30910) ApplicationDispatcherBootstrapTest.testApplicationIsStoppedWhenStoppingBootstrap fails with assertion

2023-02-05 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30910:
-

 Summary: 
ApplicationDispatcherBootstrapTest.testApplicationIsStoppedWhenStoppingBootstrap
 fails with assertion
 Key: FLINK-30910
 URL: https://issues.apache.org/jira/browse/FLINK-30910
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.16.1
Reporter: Matthias Pohl


A build failure in 
{{ApplicationDispatcherBootstrapTest.testApplicationIsStoppedWhenStoppingBootstrap}}:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45722=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9831

{code}
Feb 05 01:13:44 [ERROR] Tests run: 30, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 5.174 s <<< FAILURE! - in 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrapTest
Feb 05 01:13:44 [ERROR] 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrapTest.testApplicationIsStoppedWhenStoppingBootstrap
  Time elapsed: 2.026 s  <<< FAILURE!
Feb 05 01:13:44 org.opentest4j.AssertionFailedError: 
Feb 05 01:13:44 
Feb 05 01:13:44 Expecting value to be true but was false
Feb 05 01:13:44 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Feb 05 01:13:44 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Feb 05 01:13:44 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Feb 05 01:13:44 at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrapTest.testApplicationIsStoppedWhenStoppingBootstrap(ApplicationDispatcherBootstrapTest.java:361)
[...]
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30909) Running HA (hashmap, sync) end-to-end test

2023-02-05 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30909:
-

 Summary: Running HA (hashmap, sync) end-to-end test
 Key: FLINK-30909
 URL: https://issues.apache.org/jira/browse/FLINK-30909
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Coordination
Affects Versions: 1.15.3
Reporter: Matthias Pohl


A build failure in {{Running HA (hashmap, sync) end-to-end test}} appeared:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45721=logs=e9d3d34f-3d15-59f4-0e3e-35067d100dfe=f8a6d3eb-38cf-5cca-9a99-d0badeb5fe62=13213

{code}
Feb 05 02:52:21 FAILURE: A JM did not execute the job.
Feb 05 02:52:21 One or more tests FAILED.
Feb 05 02:52:21 Stopping job timeout watchdog (with pid=304915)
Feb 05 02:52:21 Killing JM watchdog @ 306748
Feb 05 02:52:21 Killing TM watchdog @ 307208
Feb 05 02:52:21 [FAIL] Test script contains errors.
[...]
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30908) Fatal error in ResourceManager caused YARNSessionFIFOSecuredITCase.testDetachedMode to fail

2023-02-05 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30908:
-

 Summary: Fatal error in ResourceManager caused 
YARNSessionFIFOSecuredITCase.testDetachedMode to fail
 Key: FLINK-30908
 URL: https://issues.apache.org/jira/browse/FLINK-30908
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN, Runtime / Coordination
Affects Versions: 1.17.0
Reporter: Matthias Pohl


There's a build failure in {{YARNSessionFIFOSecuredITCase.testDetachedMode}} 
which is caused by a fatal error in the ResourceManager:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45720=logs=245e1f2e-ba5b-5570-d689-25ae21e5302f=d04c9862-880c-52f5-574b-a7a79fef8e0f=29869

{code}
Feb 05 02:41:58 java.io.InterruptedIOException: Interrupted waiting to send RPC 
request to server
Feb 05 02:41:58 java.io.InterruptedIOException: Interrupted waiting to send RPC 
request to server
Feb 05 02:41:58 at org.apache.hadoop.ipc.Client.call(Client.java:1480) 
~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 at org.apache.hadoop.ipc.Client.call(Client.java:1422) 
~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
 ~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 at com.sun.proxy.$Proxy31.allocate(Unknown Source) 
~[?:?]
Feb 05 02:41:58 at 
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)
 ~[hadoop-yarn-common-3.2.3.jar:?]
Feb 05 02:41:58 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) ~[?:1.8.0_292]
Feb 05 02:41:58 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_292]
Feb 05 02:41:58 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_292]
Feb 05 02:41:58 at java.lang.reflect.Method.invoke(Method.java:498) 
~[?:1.8.0_292]
Feb 05 02:41:58 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
 ~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
 ~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
 ~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
 ~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
 ~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 at com.sun.proxy.$Proxy32.allocate(Unknown Source) 
~[?:?]
Feb 05 02:41:58 at 
org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:325)
 ~[hadoop-yarn-client-3.2.3.jar:?]
Feb 05 02:41:58 at 
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:311)
 [hadoop-yarn-client-3.2.3.jar:?]
Feb 05 02:41:58 Caused by: java.lang.InterruptedException
Feb 05 02:41:58 at 
java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) ~[?:1.8.0_292]
Feb 05 02:41:58 at 
java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:1.8.0_292]
Feb 05 02:41:58 at 
org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1180) 
~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 at org.apache.hadoop.ipc.Client.call(Client.java:1475) 
~[hadoop-common-3.2.3.jar:?]
Feb 05 02:41:58 ... 17 more
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30907) HiveSinkCompactionITCase.testSingleParallelism fails with test timeout

2023-02-05 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30907:
-

 Summary: HiveSinkCompactionITCase.testSingleParallelism fails with 
test timeout
 Key: FLINK-30907
 URL: https://issues.apache.org/jira/browse/FLINK-30907
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.15.3
Reporter: Matthias Pohl


We experience the a test failure in 
{{HiveSinkCompactionITCase.testSingleParallelism}} due to a 
{{TestTimeoutException}}:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45708=logs=fc5181b0-e452-5c8f-68de-1097947f6483=995c650b-6573-581c-9ce6-7ad4cc038461=26356

{code}
Feb 04 01:56:55 [ERROR] Tests run: 6, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 187.918 s <<< FAILURE! - in 
org.apache.flink.connectors.hive.HiveSinkCompactionITCase
Feb 04 01:56:55 [ERROR] HiveSinkCompactionITCase.testSingleParallelism  Time 
elapsed: 90.009 s  <<< ERROR!
Feb 04 01:56:55 org.junit.runners.model.TestTimedOutException: test timed out 
after 90 seconds
Feb 04 01:56:55 at java.io.FileDescriptor.sync(Native Method)
Feb 04 01:56:55 at 
org.apache.derby.impl.io.DirRandomAccessFile.sync(Unknown Source)
[...]
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30906) TwoInputStreamTask passes wrong configuration object when creating input processor

2023-02-05 Thread Yun Gao (Jira)
Yun Gao created FLINK-30906:
---

 Summary: TwoInputStreamTask passes wrong configuration object when 
creating input processor
 Key: FLINK-30906
 URL: https://issues.apache.org/jira/browse/FLINK-30906
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.16.1, 1.17.0
Reporter: Yun Gao


It seems _StreamTwoInputProcessorFactory.create_ is passed with wrong 
configuration object: the taskManagerConfiguration should be __ 
_getEnvironment().getTaskManagerInfo().getConfiguration()._ 
 
And in the following logic, it seems to indeed try to load taskmanager options 
from this configuration object, like state-backend and 
taskmanager.memory.managed.consumer-weights 
 

[1]https://github.com/apache/flink/blob/111342f37bdc0d582d3f7af458d9869f0548299f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java#L98



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30905) doc generation fails with "concurrent map read and map write"

2023-02-05 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-30905:
-

 Summary: doc generation fails with "concurrent map read and map 
write"
 Key: FLINK-30905
 URL: https://issues.apache.org/jira/browse/FLINK-30905
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Test Infrastructure
Affects Versions: 1.16.1, 1.15.3, 1.17.0
Reporter: Matthias Pohl


We experience a build failure in {{master}} (but since it looks like a Hugo 
issue, I added already released version to the affected versions as well) with 
a {{concurrent map read and map write}} within hugo:
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45707=logs=6dc02e5c-5865-5c6a-c6c5-92d598e3fc43=ddd6d61a-af16-5d03-2b9a-76a279badf98

{code}
Start building sites … 
fatal error: concurrent map read and map write

goroutine 233 [running]:
runtime.throw(0x23054e4, 0x21)
/usr/local/go/src/runtime/panic.go:1116 +0x72 fp=0xc0016ea860 
sp=0xc0016ea830 pc=0x4f5ff2
runtime.mapaccess1_faststr(0x1f71280, 0xc000764a20, 0xc000aa60e1, 0x18, 0xcd)
/usr/local/go/src/runtime/map_faststr.go:21 +0x465 fp=0xc0016ea8d0 
sp=0xc0016ea860 pc=0x4d29c5
[...]
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30904) Update the documentation and configuration description of slow task detector

2023-02-05 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-30904:
---

 Summary: Update the documentation and configuration description of 
slow task detector
 Key: FLINK-30904
 URL: https://issues.apache.org/jira/browse/FLINK-30904
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / Configuration
 Environment: FLINK-30707 improved the slow task detecting. The 
previous documentation and configuration descriptions of SlowTaskDetector need 
to be be updated for it.
Reporter: Zhu Zhu
 Fix For: 1.17.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-208: Add RecordEvaluator to dynamically stop source based on de-serialized records

2023-02-05 Thread Dong Lin
As I am reviewing the FLIP-208 PR here
, I realized that a new public API
change was added to the Proposed Change section in this
 modification after this voting
thread starts, without being mentioned in this thread.

I have moved this change to the Public Interface section now. The change is
that "a new connector option 'scan.record.evaluator.class' will be added to
provide the custom RecordEvaluator class".

Since this change is relatively minor and looks good to me, I will re-use
this voting thread to confirm this change is OK.

Qingsheng and Leonard: can you help check whether this public interface
change looks good to you?

I will keep this discussion open for at least 72 hours before merging the
PR.

Thanks,
Dong


On Tue, Dec 27, 2022 at 3:29 PM Leonard Xu  wrote:

> +1 (binding)
>
> Best,
> Leonard
>
>
> > On Dec 26, 2022, at 4:22 PM, Qingsheng Ren  wrote:
> >
> > +1 (binding)
> >
> > Best,
> > Qingsheng
> > Ververica (Alibaba)
> >
> > On Wed, Dec 21, 2022 at 3:13 PM Dong Lin  wrote:
> >
> >> Hi all,
> >>
> >> We would like to start the vote for FLIP-208: Add RecordEvaluator to
> >> dynamically stop source based on de-serialized records [1]. This FLIP
> was
> >> discussed in this thread [2].
> >>
> >> This feature is needed by users who currently depend on
> >> KafkaDeserializationSchema::isEndOfStream() to migrate their Flink job
> from
> >> FlinkKafkaConsumer to KafkaSource. And we won't be able to
> >> remove FlinkKafkaConsumer and FlinkKafkaProducer before adding this
> >> feature.
> >>
> >> Thanks,
> >> Dong
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Add+RecordEvaluator+to+dynamically+stop+source+based+on+de-serialized+records
> >> [2] https://lists.apache.org/thread/z87m68ggzkx0s427tmrllswm4l1g7owc
> >>
>
>


[jira] [Created] (FLINK-30903) Fix the maxParallelism of DefaultVertexParallelismAndInputInfosDecider fallback to default parallelism

2023-02-05 Thread Junrui Li (Jira)
Junrui Li created FLINK-30903:
-

 Summary: Fix the maxParallelism of 
DefaultVertexParallelismAndInputInfosDecider fallback to default parallelism
 Key: FLINK-30903
 URL: https://issues.apache.org/jira/browse/FLINK-30903
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.17.0
Reporter: Junrui Li
 Fix For: 1.17.0


In FLINK-30684 we mark the vertices which use the default parallelism, and in 
AdaptiveBatchScheduler we allow users to use parallelism.default as the max 
parallelism if they don't configure the configuration item 
"execution.batch.adaptive.auto-parallelism.max-parallelism". This issue will 
fix the fallback logic.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30902) Partition reuse does not take effect on edges of hybrid selective type

2023-02-05 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-30902:
--

 Summary: Partition reuse does not take effect on edges of hybrid 
selective type
 Key: FLINK-30902
 URL: https://issues.apache.org/jira/browse/FLINK-30902
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.17.0
Reporter: Weijie Guo
Assignee: Weijie Guo
 Fix For: 1.17.0


Partition reuse only take effect for re-consumable edge, but hybrid selective 
result partition is not re-consumable. This optimization is very important to 
reduce the cost of the shuffle write phase. In the previous implementation, we 
will only force the broadcast edge to be of hybrid full(re-consumable) in the 
'ResultPartitionTypeFactory'. As a result, for ALL_ EXCHANGE_HYBRID_SELECTIVE 
job, partition reuse cannot take effect for non-broadcast edges.
In fact, we expected to replace all the edges that can be reused with hybrid 
full result partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30901) Fix set jobVertex parallelismConfigured don't contain chained sources.

2023-02-05 Thread Junrui Li (Jira)
Junrui Li created FLINK-30901:
-

 Summary: Fix set jobVertex parallelismConfigured don't contain 
chained sources.
 Key: FLINK-30901
 URL: https://issues.apache.org/jira/browse/FLINK-30901
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.17.0
Reporter: Junrui Li
 Fix For: 1.17.0


When create OperatorChainInfo in StreamingJobGenerator, the chained source 
don't include in the chainedNodes and they will not be added to chainInfo via 
addNodeToChain.

This will cause the jobVertex's parallelismConfigured property to be wrong. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30900) Introduce utils for table store

2023-02-05 Thread Shammon (Jira)
Shammon created FLINK-30900:
---

 Summary: Introduce utils for table store
 Key: FLINK-30900
 URL: https://issues.apache.org/jira/browse/FLINK-30900
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: Shammon


Introduce utils from flink-core for table store



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-05 Thread Steven Wu
Regarding the discussion on global committer [1] for sinks with global
transactions, there is no consensus on solving that problem in SinkV2. Will
it require any breaking change in SinkV2?

Also will SinkV1 be deprecated too? or it should happen sometime after
SinkFunction deprecation?

[1] https://lists.apache.org/thread/82bgvlton9olb591bfg2djv0cshj1bxj

On Sun, Feb 5, 2023 at 2:14 AM Dong Lin  wrote:

> Hi Konstantin,
>
> Thanks for the comment! Please see my comment inline.
>
> Cheers,
> Dong
>
> On Sat, Feb 4, 2023 at 2:06 AM Konstantin Knauf  wrote:
>
> > Hi everyone,
> >
> > sorry for joining the discussion late.
> >
> > 1) Is there an option to deprecate SinkFunction in Flink 1.17 while
> leaving
> > SinkV2 @PublicEvolving in Flink 1.17. We then aim to make SinkV2 @Public
> in
> > and remove SinkFunction in Flink 1.18. @PublicEvolving are intended for
> > public use. So, I don't see it as a blocker for deprecating SinkFunction
> > that we have to make SinkV2 @PublicEvovling. For reference this is the
> > description of @PublicEvovling:
> >
> > /**
> >  * Annotation to mark classes and methods for public use, but with
> > evolving interfaces.
> >  *
> >  * Classes and methods with this annotation are intended for public
> > use and have stable behavior.
> >  * However, their interfaces and signatures are not considered to be
> > stable and might be changed
> >  * across versions.
> >  *
> >  * This annotation also excludes methods and classes with evolving
> > interfaces / signatures within
> >  * classes annotated with {@link Public}.
> >  */
> >
> >
> > Marking SinkFunction @Deprecated would already single everyone to move to
> > SinkV2, which we as a community, I believe, have a strong interest in.
> Its
> >
>
> Yes, I also believe we all have this strong interest. I just hope that this
> can be done in the best possible way that does not confuse users.
>
> I probably still have the same concern regarding its impact on users: if we
> mark an API as deprecated, it effectively means the users of this API
> should start to migrate to another API (e.g. SinkV2) and we might remove
> this API in the future. However, given that we know there are known
> problems preventing users from doing so, it seems that we are not ready to
> send this message to users right.
>
> If I understand correctly, I guess you are suggesting that by marking
> SinkFunction as deprecated, we can put higher pressure on Flink
> contributors to update the existing Flink codebase to improve and use
> SinkV2.
>
> I am not sure this is the right way to use @deprecated, which has a
> particular meaning for its users rather than contributors. And I am also
> not sure we can even pressure contributors of an open-source project into
> developing a feature (e.g. migrate all existing SinkFunction subclasses to
> SinkV2). IMO, the typical way is for the contributor with interest/time to
> work on the feature, or talk to other contributors whether they are willing
> to collaborate/work on this, rather than pressuring other contributors into
> working on this.
>
>
> almost comical how long the transition from SourceFurnction/SinkFunction to
> > Source/Sink takes us. At the same time, we leave ourselves the option to
> to
> > make small changes to SinkV2 if any problems arise during the migration
> of
> > these connector.
> >
> > I think, we have a bit of a chicken/egg problem here. The pressure for
> >
>
> Similar to the reason described above, I am not sure we have a chicken/egg
> problem here. The issue here is that SinkV2 is not ready and we have a lot
> of existing SinkFunction that is not migrated by ourselves. We (Flink
> contributors) probably do not need to mark SinkFunction as deprecated in
> order to address these issues in our own codebase.
>
>
> users and contributors is not high enough to move away from SinkFunction as
> > long as its not deprecated, but at the same time we need people to
> migrate
> > their connectors to see if there are any gaps in SinkV2. I believe, the
> > combination proposed above could bridge this problem.
> >
> > 2) I don't understand the argument of waiting until some of the
> > implementations are @Public. How can we make the implementations of the
> > SinkV2 API @Public without making SinkV2 @Public? All public methods of
> > SinkV2 are part of every implementation. So to me it actually seems to be
> > opposite: in order to make any of the implementation @Public we first
> need
> > to make the API @Public.
> >
>
> Yeah I also agree with you.
>
>
> >
> > Cheers,
> >
> > Konstantin
> >
> > Am Mo., 30. Jan. 2023 um 13:18 Uhr schrieb Dong Lin  >:
> >
> > > Hi Martijn,
> > >
> > > Thanks for driving this effort to clean-up the Flink codebase!
> > >
> > > I like the idea to cleanup Flink codebase to avoid having two Sinks. On
> > the
> > > other hand, I also thing the concern mentioned by Jing makes sense. In
> > > addition to thinking in terms of the rule proposed in FLIP-197
> > > <
> > >
> >
> 

Re: Need help how to use Table API to join two Kafka streams

2023-02-05 Thread yuxia
Hi, thanks for reaching me out. 
For your question, you don't need to cosume data in my cosumer class seperately 
and then insert them into those tables. The data will be consumed from what we 
implemented here. 

Best regards, 
Yuxia 


发件人: "Amir Hossein Sharifzadeh"  
收件人: luoyu...@alumni.sjtu.edu.cn 
发送时间: 星期日, 2023年 2 月 05日 上午 6:07:02 
主题: Re: Need help how to use Table API to join two Kafka streams 

Dear Yuxia, [ mailto:dev@flink.apache.org | dev@flink.apache.org ] 
Thank you again for your help. I am implementing code in Python. But I am still 
have some confusion about my application. 
As I mentioned before, I am sending two simple messages (JSON) on two different 
topics: 
This is my Kafka producer class: 
import json 
import sys 
from kafka import KafkaProducer 
def serializer (dictionary): 
try : 
message = json.dumps(dictionary) 
except Exception as e: 
sys.stderr.write( str (e) + ' \n ' ) 
message = str (dictionary) 
return message.encode( 'utf8' ) 
def create_sample_json (row_id): 
return { 'row_id' : int (row_id), 'my_data' : str ( int (row_id) + 7 )} 
def do_produce (topic_name): 
producer = KafkaProducer( bootstrap_servers =KAFKA_SERVER, value_serializer 
=serializer) 
for row_id in range ( 1 , 10 ): 
my_data = data_helper.create_sample_json(row_id) 
producer.send(topic_name, my_data) 
producer.flush() 
if __name__ == '__main__' : 
do_produce('topic1') 
do_produce('topic2') 
==
 
As you helped me, this is my Flink Consumer that I want to cosnume data from 
producer and run queries on them: 
from pyflink.datastream.stream_execution_environment import 
StreamExecutionEnvironment 
from pyflink.table import EnvironmentSettings 
from pyflink.table.expressions import col 
from pyflink.table.table_environment import StreamTableEnvironment 

from org.varimat.model.com.varimat_constants import EMPAD_TOPIC 

KAFKA_SERVERS = 'localhost:9092' 

def log_processing (): 
env = StreamExecutionEnvironment.get_execution_environment() 
env.add_jars( "file:///Users/amir/empad_jar/kafka-clients-3.3.2.jar" ) 
env.add_jars( "file:///Users/amir/empad_jar/flink-connector-kafka-1.16.1.jar" ) 
env.add_jars( 
"file:///Users/amir/empad_jar/flink-sql-connector-kafka-1.16.1.jar" ) 

settings = EnvironmentSettings.new_instance() \ 
.in_streaming_mode() \ 
.build() 

t_env = StreamTableEnvironment.create( stream_execution_environment =env, 
environment_settings =settings) 

t1 = f""" 
CREATE TEMPORARY TABLE table1( 
row_id INT, 
row_data STRING 
) WITH ( 
'connector' = 'kafka', 
'topic' = ' topic1 ', 
'properties.bootstrap.servers' = ' { KAFKA_SERVERS } ', 
' [ http://properties.group.id/ | properties.group.id ] ' = 'MY_GRP', 
'scan.startup.mode' = 'latest-offset', 
'format' = 'json' 
) 
""" 

t2 = f""" 
CREATE TEMPORARY TABLE table2( 
row_id INT, 
row_data STRING 
) WITH ( 
'connector' = 'kafka', 
'topic' = ' table2 ', 
'properties.bootstrap.servers' = ' { KAFKA_SERVERS } ', 
' [ http://properties.group.id/ | properties.group.id ] ' = 'MY_GRP', 
'scan.startup.mode' = 'latest-offset', 
'format' = 'json' 
) 
""" 

t_env.execute_sql(t1) 
t_env.execute_sql(t2) 

t3 = t_env.sql_query( " SELECT row_id, row_data as my_raw_data FROM table2 " ) 
// please tell me what should I do next: 
// Questions: 
// 1) Do I need to cosume data in my cosumer class seperately and then insert 
them into those tables or data will be consumed from 
what we implemented here (as I passed the name of the connector, toipc, 
bootstartap.servers, etc...)? 
// 2) If so: 
2.1) how can I make join from those streams in Python? 
2.2) How can I prevant the previous data as my rocedure will send thousands 
messages in each topic. I want to make sure that 
not to make duplicate queries. 
// 3) If not, what should I do? 

Thank you very much. 
Amir 






On Fri, Feb 3, 2023 at 5:45 AM yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | 
luoyu...@alumni.sjtu.edu.cn ] > wrote: 


Hi, Amir. 
May look like using scala code: 

val t1 = tableEnv.executeSql("CREATE TEMPORARY TABLE s1 (id int, ssn string) 
WITH ('connector' = 'kafka', ...); 
val t2 = tableEnv.executeSql("CREATE TEMPORARY TABLE s2 (id int, ssn string) 
WITH ('connector' = 'kafka', ...); 

// you will need to rename the field to join, otherwise, it'll 
"org.apache.flink.table.api.ValidationException: Ambiguous column name: ssn". 
val t3 = tableEnv.sqlQuery("SELECT id, ssn as ssn1 FROM s2") 
val result = t1.join(t3).where($"ssn" === $"ssn1"); 

Also, you can refer here for more detail[1]. 
[1] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/#joins
 ] 

Best regards, 
Yuxia 

- 原始邮件 - 
发件人: "Amir Hossein Sharifzadeh" < [ mailto:amirsharifza...@gmail.com | 
amirsharifza...@gmail.com ] > 
收件人: "dev" < [ mailto:dev@flink.apache.org | dev@flink.apache.org ] > 
发送时间: 星期五, 2023年 2 月 03日 上午 4:45:08 
主题: Need 

Re: [DISCUSS] Promote SinkV2 to @Public and deprecate SinkFunction

2023-02-05 Thread Dong Lin
Hi Konstantin,

Thanks for the comment! Please see my comment inline.

Cheers,
Dong

On Sat, Feb 4, 2023 at 2:06 AM Konstantin Knauf  wrote:

> Hi everyone,
>
> sorry for joining the discussion late.
>
> 1) Is there an option to deprecate SinkFunction in Flink 1.17 while leaving
> SinkV2 @PublicEvolving in Flink 1.17. We then aim to make SinkV2 @Public in
> and remove SinkFunction in Flink 1.18. @PublicEvolving are intended for
> public use. So, I don't see it as a blocker for deprecating SinkFunction
> that we have to make SinkV2 @PublicEvovling. For reference this is the
> description of @PublicEvovling:
>
> /**
>  * Annotation to mark classes and methods for public use, but with
> evolving interfaces.
>  *
>  * Classes and methods with this annotation are intended for public
> use and have stable behavior.
>  * However, their interfaces and signatures are not considered to be
> stable and might be changed
>  * across versions.
>  *
>  * This annotation also excludes methods and classes with evolving
> interfaces / signatures within
>  * classes annotated with {@link Public}.
>  */
>
>
> Marking SinkFunction @Deprecated would already single everyone to move to
> SinkV2, which we as a community, I believe, have a strong interest in. Its
>

Yes, I also believe we all have this strong interest. I just hope that this
can be done in the best possible way that does not confuse users.

I probably still have the same concern regarding its impact on users: if we
mark an API as deprecated, it effectively means the users of this API
should start to migrate to another API (e.g. SinkV2) and we might remove
this API in the future. However, given that we know there are known
problems preventing users from doing so, it seems that we are not ready to
send this message to users right.

If I understand correctly, I guess you are suggesting that by marking
SinkFunction as deprecated, we can put higher pressure on Flink
contributors to update the existing Flink codebase to improve and use
SinkV2.

I am not sure this is the right way to use @deprecated, which has a
particular meaning for its users rather than contributors. And I am also
not sure we can even pressure contributors of an open-source project into
developing a feature (e.g. migrate all existing SinkFunction subclasses to
SinkV2). IMO, the typical way is for the contributor with interest/time to
work on the feature, or talk to other contributors whether they are willing
to collaborate/work on this, rather than pressuring other contributors into
working on this.


almost comical how long the transition from SourceFurnction/SinkFunction to
> Source/Sink takes us. At the same time, we leave ourselves the option to to
> make small changes to SinkV2 if any problems arise during the migration of
> these connector.
>
> I think, we have a bit of a chicken/egg problem here. The pressure for
>

Similar to the reason described above, I am not sure we have a chicken/egg
problem here. The issue here is that SinkV2 is not ready and we have a lot
of existing SinkFunction that is not migrated by ourselves. We (Flink
contributors) probably do not need to mark SinkFunction as deprecated in
order to address these issues in our own codebase.


users and contributors is not high enough to move away from SinkFunction as
> long as its not deprecated, but at the same time we need people to migrate
> their connectors to see if there are any gaps in SinkV2. I believe, the
> combination proposed above could bridge this problem.
>
> 2) I don't understand the argument of waiting until some of the
> implementations are @Public. How can we make the implementations of the
> SinkV2 API @Public without making SinkV2 @Public? All public methods of
> SinkV2 are part of every implementation. So to me it actually seems to be
> opposite: in order to make any of the implementation @Public we first need
> to make the API @Public.
>

Yeah I also agree with you.


>
> Cheers,
>
> Konstantin
>
> Am Mo., 30. Jan. 2023 um 13:18 Uhr schrieb Dong Lin :
>
> > Hi Martijn,
> >
> > Thanks for driving this effort to clean-up the Flink codebase!
> >
> > I like the idea to cleanup Flink codebase to avoid having two Sinks. On
> the
> > other hand, I also thing the concern mentioned by Jing makes sense. In
> > addition to thinking in terms of the rule proposed in FLIP-197
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-197%3A+API+stability+graduation+process
> > >
> > (which
> > seems to focus mostly on the Flink developers' perspective), it might be
> > useful to also think about the story from users' perspective and make
> sure
> > their concerns can be addressed.
> >
> > Typically, by marking an API as deprecated, we are effectively telling
> > users *they should start to migrate to the new API ASAP and we reserve
> the
> > right to remove this API completely in the 1-2 releases*. Then it might
> be
> > reasonable for users to ask questions such as:
> >
> > 1) Does SinkV2 public API provides all