Re: Flink 1.15 Interval Join error after Deduplicate

2022-10-16 Thread Yuxia Luo
The view A try to  do de-duplication using event time,  which will still 
produce update rows.  if you using proc time  to do de-duplication.Then the 
view A should only produce append only rows.

Best regards,
Yuxia



> 2022年10月15日 上午9:50,liebin...@whu.edu.cn 写道:
> 
> I had a problem with Interval Join after using Deduplicate. I'm using Flink 
> version 1.15.
> 
> I want to use Flink's Interval Join for double-stream association, and my 
> first table needs to be de-duplicated. Here is my sample code.
> 
> ```
> CREATE TEMPORARY TABLE `source` (
>  id INT,
>  name STRING,
>  event_time TIMESTAMP(3),
>  WATERMARK FOR event_time AS event_time
> ) WITH (
>  'connector' = 'datagen'
> );
> 
> 
> CREATE TEMPORARY TABLE B (
>  id INT,
>  `start` INT,
>  `end` INT,
>  event_time TIMESTAMP(3),
>  WATERMARK FOR event_time AS event_time
> ) WITH (
>  'connector' = 'datagen'
> );
> 
> create TEMPORARY view A as
> select id, name, event_time from (
>  select id, name, event_time,
>  row_number() over(partition by id, name, event_time order by event_time asc) 
> as rn
>  from source
> )
> where rn = 1;
> 
> SELECT *
> FROM A, B
> WHERE 
>A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND 
>A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND 
>B.event_time + INTERVAL '10' SECOND;
> ```
> 
> I used to preserve the first row of data for the de-duplication, so view A 
> should only produce insert rows, but running the SQL above would produce the 
> following error.
> 
> ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't 
> support consuming update and delete changes which is produced by node 
> Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME])
> ```
> 
> How to perform Interval Join after using Deduplicate?



Re: Flink 1.15 Interval Join error after Deduplicate

2022-10-16 Thread LB
Why keep the first row still produce update/delete rows?

There was a similar question before. Jark said that keep the first line should 
produce an append-only stream and interval join can consume from it. The link 
to this question is: 
https://lists.apache.org/thread/z5w5n7o9g87dtb7p7q7p7vrvowqhth2d

Did the latest version of Flink change deduplication?

Re: Flink 1.15 Interval Join error after Deduplicate

2022-10-16 Thread Yuxia Luo
> view A should only produce insert rows
No, the view A will still produce update/delete rows. 

Best regards,
Yuxia



> 2022年10月15日 上午9:50,liebin...@whu.edu.cn 写道:
> 
> view A should only produce insert rows



Re: SQL Changes between 1.14 and 1.15?

2022-10-16 Thread Yuxia Luo
Thanks for raising it. It seems a bug that introduced by this pr [1]. I have 
created [FLINK-29651] to trace it. 

[1] https://github.com/apache/flink/pull/19001 

[2] https://issues.apache.org/jira/browse/FLINK-26520 


Best regards,
Yuxia



> 2022年10月14日 下午9:19,PACE, JAMES  写道:
> 
> We’ve noticed the following difference in sql when upgrading from flink 
> 1.14.5 to 1.15.2 around characters that are escaped in an sql statement:
>  
> This statement:
>   tableEnvironment.executeSql("select * from testTable WHERE lower(field1) 
> LIKE 'b\"cd\"e%'");
> produces a runtime error in flink 1.15.2, but executes properly in flink 
> 1.14.5
>  
> This can be worked around by escaping the backslash, changing the statement 
> to:
>   tableEnvironment.executeSql("select * from testTable WHERE lower(field1) 
> LIKE 'b\\\"cd\\\"e%'");
>  
> This code illustrates the issue:
>  
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>  
> public class TestCase3 {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>  
> TestData testData = new TestData();
> testData.setField1("b\"cd\"e");
> DataStream stream = env.fromElements(testData);
> stream.print();
> final StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env);
> tableEnvironment.createTemporaryView("testTable", stream, 
> Schema.newBuilder().build());
>  
> // Works with Flink 1.14.x, flink runtime errors in 1.15.2.  
> Uncomment to see runtime trace
> //tableEnvironment.executeSql("select *, '1' as run from testTable 
> WHERE lower(field1) LIKE 'b\"cd\"e%'").print();
> // Works with 1.15.2
> tableEnvironment.executeSql("select * from testTable WHERE 
> lower(field1) LIKE 'b\\\"cd\\\"e%'").print();
>  
> env.execute("TestCase");
> }
>  
> public static class TestData {
> private String field1;
>  
> public String getField1() { return field1; }
> public void setField1(String field1) { this.field1 = field1; }
> }
> }
>  
> Thanks
> Jim



Re: Job Manager getting restarted while restarting task manager

2022-10-16 Thread yu'an huang
Are you able to replay this scenario? Did you accidently send killing
signal to the job mananger process?

On Thu, 13 Oct 2022 at 4:02 PM, Puneet Duggal 
wrote:

> Hi,
>
> We use session deployment mode with HA setup. Currently we have 3 job
> managers and 3 task managers running on flink version 1.12.1. Please find
> attached the complete job manager logs.
>
>
>
>
>
> On 13-Oct-2022, at 7:28 AM, Xintong Song  wrote:
>
> I meant your jobmanager also received a SIGTERM signal, and you would need
> to figure out where it comes from.
>
> To be specific, this line of log:
>
>> 2022-10-11 22:11:21,683 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED
>> SIGNAL 15: SIGTERM. Shutting down as requested.
>>
>
> I believe this is from the jobmanager log, as `ClusterEntrypoint` is a
> class used by jobmanager only.
>
> Best,
> Xintong
>
>
>
> On Thu, Oct 13, 2022 at 9:06 AM yu'an huang  wrote:
>
>> Hi,
>>
>> Which deployment mode do you use? What is the Flink version?
>> I think killing TaskManagers won't make the JobMananger restart. You can
>> provide the whole log as an attachment to investigate.
>>
>> On Wed, 12 Oct 2022 at 6:01 PM, Puneet Duggal 
>> wrote:
>>
>>> Hi Xintong Song,
>>>
>>> Thanks for your immediate reply. Yes, I do restart task manager via kill
>>> command and then flink restart because I have seen cases where simple flink
>>> restart does not pickup the latest configuration. But what I am confused
>>> about is why killing the task manager process and then restarting it is
>>> causing the job manager to stop and restart.
>>>
>>> Regards,
>>> Puneet
>>>
>>>
>>> On 12-Oct-2022, at 7:33 AM, Xintong Song  wrote:
>>>
>>> The log shows that the jobmanager received a SIGTERM signal from
>>> external. Depending on how you deploy Flink, that could be a 'kill '
>>> command, or a kubernetes pod removal / eviction, etc. You may want to check
>>> where the signal came from.
>>>
>>> Best,
>>> Xintong
>>>
>>>
>>>
>>> On Wed, Oct 12, 2022 at 6:26 AM Puneet Duggal <
>>> puneetduggal1...@gmail.com> wrote:
>>>
 Hi,

 I am facing an issue where when restarting task manager after adding
 some configuration changes, even though task manager restarts successfully
 with the updated configuration change, is causing the leader job manager to
 restart as well. Pasting the leader job manager logs here


 2022-10-11 22:11:02,207 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system [
 akka.tcp://flink@:35376] has failed, address is now gated for
 [50] ms. Reason: [Disassociated]
 2022-10-11 22:11:02,411 WARN
 akka.remote.transport.netty.NettyTransport   [] - Remote
 connection to [null] failed with java.net.ConnectException: Connection
 refused: /:35376
 2022-10-11 22:11:02,413 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system [
 akka.tcp://flink@:35376] has failed, address is now gated for
 [50] ms. Reason: [Association failed with [
 akka.tcp://flink@:35376]] Caused by:
 [java.net.ConnectException: Connection refused: /:35376]
 2022-10-11 22:11:02,682 WARN
 akka.remote.transport.netty.NettyTransport   [] - Remote
 connection to [null] failed with java.net.ConnectException: Connection
 refused: /:35376
 2022-10-11 22:11:02,683 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system [
 akka.tcp://flink@:35376] has failed, address is now gated for
 [50] ms. Reason: [Association failed with [
 akka.tcp://flink@:35376]] Caused by:
 [java.net.ConnectException: Connection refused: /:35376]
 2022-10-11 22:11:12,702 WARN
 akka.remote.transport.netty.NettyTransport   [] - Remote
 connection to [null] failed with java.net.ConnectException: Connection
 refused: /:35376
 2022-10-11 22:11:12,703 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system [
 akka.tcp://flink@:35376] has failed, address is now gated for
 [50] ms. Reason: [Association failed with [
 akka.tcp://flink@:35376]] Caused by:
 [java.net.ConnectException: Connection refused: /:35376]
 2022-10-11 22:11:21,683 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - RECEIVED
 SIGNAL 15: SIGTERM. Shutting down as requested.
 2022-10-11 22:11:21,687 INFO  org.apache.flink.runtime.blob.BlobServer
[] - Stopped BLOB server at 0.0.0.0:33887


 Regards,
 Puneet



>>>
>