[jira] [Created] (FLINK-20449) UnalignedCheckpointITCase times out

2020-12-01 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-20449:
--

 Summary: UnalignedCheckpointITCase times out
 Key: FLINK-20449
 URL: https://issues.apache.org/jira/browse/FLINK-20449
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.13.0
Reporter: Robert Metzger
 Fix For: 1.13.0


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=10416=logs=39d5b1d5-3b41-54dc-6458-1e2ddd1cdcf3=a99e99c7-21cd-5a1f-7274-585e62b72f56

{code}
2020-12-02T01:24:33.7219846Z [ERROR] Tests run: 10, Failures: 0, Errors: 2, 
Skipped: 0, Time elapsed: 746.887 s <<< FAILURE! - in 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase
2020-12-02T01:24:33.7220860Z [ERROR] execute[Parallel cogroup, p = 
10](org.apache.flink.test.checkpointing.UnalignedCheckpointITCase)  Time 
elapsed: 300.24 s  <<< ERROR!
2020-12-02T01:24:33.7221663Z org.junit.runners.model.TestTimedOutException: 
test timed out after 300 seconds
2020-12-02T01:24:33.7222017Zat sun.misc.Unsafe.park(Native Method)
2020-12-02T01:24:33.7222390Zat 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
2020-12-02T01:24:33.7222882Zat 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
2020-12-02T01:24:33.7223356Zat 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
2020-12-02T01:24:33.7223840Zat 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
2020-12-02T01:24:33.7224320Zat 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
2020-12-02T01:24:33.7224864Zat 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1842)
2020-12-02T01:24:33.7225500Zat 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:70)
2020-12-02T01:24:33.7226297Zat 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1822)
2020-12-02T01:24:33.7226929Zat 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1804)
2020-12-02T01:24:33.7227572Zat 
org.apache.flink.test.checkpointing.UnalignedCheckpointTestBase.execute(UnalignedCheckpointTestBase.java:122)
2020-12-02T01:24:33.7228187Zat 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:159)
2020-12-02T01:24:33.7228680Zat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2020-12-02T01:24:33.7229099Zat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2020-12-02T01:24:33.7229617Zat 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2020-12-02T01:24:33.7230068Zat 
java.lang.reflect.Method.invoke(Method.java:498)
2020-12-02T01:24:33.7230733Zat 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
2020-12-02T01:24:33.7231262Zat 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2020-12-02T01:24:33.7231775Zat 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
2020-12-02T01:24:33.7232276Zat 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2020-12-02T01:24:33.7232732Zat 
org.junit.rules.Verifier$1.evaluate(Verifier.java:35)
2020-12-02T01:24:33.7233144Zat 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
2020-12-02T01:24:33.7233663Zat 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
2020-12-02T01:24:33.7234239Zat 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
2020-12-02T01:24:33.7234735Zat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
2020-12-02T01:24:33.7235093Zat java.lang.Thread.run(Thread.java:748)
2020-12-02T01:24:33.7235305Z 
2020-12-02T01:24:33.7235728Z [ERROR] execute[Parallel union, p = 
10](org.apache.flink.test.checkpointing.UnalignedCheckpointITCase)  Time 
elapsed: 300.539 s  <<< ERROR!
2020-12-02T01:24:33.7236436Z org.junit.runners.model.TestTimedOutException: 
test timed out after 300 seconds
2020-12-02T01:24:33.7236790Zat sun.misc.Unsafe.park(Native Method)
2020-12-02T01:24:33.7237158Zat 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
2020-12-02T01:24:33.7237641Zat 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
2020-12-02T01:24:33.7238118Zat 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
2020-12-02T01:24:33.7238599Zat 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
2020-12-02T01:24:33.7239885Zat 

[jira] [Created] (FLINK-20448) Obsolete generated avro classes

2020-12-01 Thread Rui Li (Jira)
Rui Li created FLINK-20448:
--

 Summary: Obsolete generated avro classes
 Key: FLINK-20448
 URL: https://issues.apache.org/jira/browse/FLINK-20448
 Project: Flink
  Issue Type: Test
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: State Processor API SQL State

2020-12-01 Thread Yun Tang
Hi Dom,

+ user mail list

Once you got to know the state descriptor, I think you could query the join 
state. The state name is easy to get via [1], it should be "left-records" and 
"right-records", and you could check what kind of join and whether has unique 
key to decide what kind of state (value state or map state). The last part is 
to find what the rowData type is in your join, and maybe other SQL guys could 
answer this or you might find it by yourself by dumping the memory of your 
taskmanager.

[1] 
https://github.com/apache/flink/blob/7a7c87096ab76f416cd7c393240faa8454db36f0/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java#L83

Best
Yun Tang




From: Dominik Wosiński 
Sent: Tuesday, December 1, 2020 21:05
To: dev 
Subject: State Processor API SQL State

Hey,
Is it currently possible to obtain the state that was created by SQL query
via the State Processor API? I am able to load the checkpoint via the State
Processor API, but I wasn't able to think of a way to access the internal
state of my JOIN Query.

Best Regards,
Dom.


[jira] [Created] (FLINK-20447) Querying grouy by PK does not work

2020-12-01 Thread Zhenwei Feng (Jira)
Zhenwei Feng created FLINK-20447:


 Summary: Querying grouy by PK does not work
 Key: FLINK-20447
 URL: https://issues.apache.org/jira/browse/FLINK-20447
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.11.2
Reporter: Zhenwei Feng


Since PRIMARY KEY is unique, it should be feasible to screen columns by PK.

The problem could be reproduced by creating a simple table:
{code:java}
CREATE TABLE test_table(
  Code STRING,
  Name  STRING,
  ...,
  PRIMARY KEY (Code) NOT ENFORCED
)WITH (...)
{code}
then parsing a SQL statement `SELECT *FROM test_table GROUP BY Code`. An 
exception as below will be thrown:

 
{code:java}
 org.apache.calcite.sql.validate.SqlValidatorException: Expression 
'test_table.Name' is not being grouped
{code}
 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]Select Chinese words from table union all get java.lang.OutOfMemoryError: Java heap space on Flink 1.9.x with Blink planner

2020-12-01 Thread Smile
*Code in Flink 1.9.3:*
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class StreamingJob {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings mySetting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
mySetting);

env.setParallelism(1);

DataStream oriStream = env.fromElements("test", "union
all");

Table testTable = tableEnv.fromDataStream(oriStream, "text");
tableEnv.registerTable("test_table", testTable);

Table regularJoin = tableEnv.sqlQuery(
"SELECT\n" +
"text AS text,\n" +
"'中文' AS another_text\n" +
"FROM\n" +
"test_table\n" +
"UNION ALL\n" +
"SELECT\n" +
"'another_text' AS text,\n" +
"'中文' AS another_text\n" +
"FROM\n" +
"test_table");
DataStream appendStream = tableEnv.toAppendStream(regularJoin,
Row.class);
appendStream.print();

env.execute("test-union-all");
}
}

*Error Message:*
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at 
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at scala.StringContext.standardInterpolator(StringContext.scala:126)
at scala.StringContext.s(StringContext.scala:95)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableStringConstants(CodeGeneratorContext.scala:716)
at
org.apache.flink.table.planner.codegen.GenerateUtils$.generateLiteral(GenerateUtils.scala:357)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLiteral(ExprCodeGenerator.scala:392)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLiteral(ExprCodeGenerator.scala:51)
at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1137)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$8.apply(ExprCodeGenerator.scala:448)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$8.apply(ExprCodeGenerator.scala:439)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:439)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
at
org.apache.flink.table.planner.codegen.ExpressionReducer$$anonfun$4.apply(ExpressionReducer.scala:84)
at
org.apache.flink.table.planner.codegen.ExpressionReducer$$anonfun$4.apply(ExpressionReducer.scala:84)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:84)

*Code in Flink 1.11.2 with some new syntax:*
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import 

[DISCUSS]Select Chinese words from table union all get java.lang.OutOfMemoryError: Java heap space on Flink 1.9.x with Blink planner

2020-12-01 Thread Smile
When using Flink 1.9.3 with Blink Planner, I tried to union some tables with
Chinese constants and get an OutOfMemoryError of Java heap space.
Here are the code and the error message.

I turned to the old planner and it works.
Then I upgraded Flink to 1.11.2 and it also works.
Also, it does work when I remove either 'union all' or Chinese words.

I searched JIRA and found  FLINK-16113
   with some bugfix of
Chinese words, but what about the memory? And what about union all?


*Code in Flink 1.9.3:*


*Error Message:*


*Code in Flink 1.11.2 with some new syntax:*




-
Best wishes.
Smile
--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


[jira] [Created] (FLINK-20446) NoMatchingTableFactoryException

2020-12-01 Thread Ke Li (Jira)
Ke Li created FLINK-20446:
-

 Summary: NoMatchingTableFactoryException
 Key: FLINK-20446
 URL: https://issues.apache.org/jira/browse/FLINK-20446
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.11.2
 Environment: * Version:1.11.2
Reporter: Ke Li


When I use sql client configuration, an error is reported, the instruction is 
as follows:
{code:java}
./sql-client.sh embedded -e /root/flink-sql-client/sql-client-demo.yml
{code}
sql-client-demo.yml:
{code:java}
tables:
  - name: SourceTable
type: source-table
update-mode: append
connector:
  type: datagen
  rows-per-second: 5
  fields:
f_sequence:
  kind: sequence
  start: 1
  end: 1000
f_random:
  min: 1
  max: 1000
f_random_str:
  length: 10
schema:
  - name: f_sequence
data-type: INT
  - name: f_random
data-type: INT
  - name: f_random_str
data-type: STRING
{code}
The error is as follows:
{code:java}
No default environment specified.No default environment specified.Searching for 
'/data/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml'...found.Reading
 default environment from: 
file:/data/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yamlReading 
session environment from: 
file:/root/flink-sql-client/sql-client-demo.ymlException in thread "main" 
org.apache.flink.table.client.SqlClientException: Unexpected exception. This is 
a bug. Please consider filing an issue. at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)Caused by: 
org.apache.flink.table.client.gateway.SqlExecutionException: Could not create 
execution context. at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)Caused by: 
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' inthe classpath.
Reason: Required context properties mismatch.
The matching 
candidates:org.apache.flink.table.sources.CsvAppendTableSourceFactoryMismatched 
properties:'connector.type' expects 'filesystem', but is 'datagen''format.type' 
expects 'csv', but is 'json'
The following properties are 
requested:connector.fields.f_random.max=1000connector.fields.f_random.min=1connector.fields.f_random_str.length=10connector.fields.f_sequence.end=1000connector.fields.f_sequence.kind=sequenceconnector.fields.f_sequence.start=1connector.rows-per-second=5connector.type=datagenformat.type=jsonschema.0.data-type=INTschema.0.name=f_sequenceschema.1.data-type=INTschema.1.name=f_randomschema.2.data-type=STRINGschema.2.name=f_random_strupdate-mode=append
The following factories have been 
considered:org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryorg.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactoryorg.apache.flink.table.sources.CsvBatchTableSourceFactoryorg.apache.flink.table.sources.CsvAppendTableSourceFactoryorg.apache.flink.table.filesystem.FileSystemTableFactory
 at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
 at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
 at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
 at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638)
 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
 ... 3 more
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20445) NoMatchingTableFactoryException

2020-12-01 Thread Ke Li (Jira)
Ke Li created FLINK-20445:
-

 Summary: NoMatchingTableFactoryException
 Key: FLINK-20445
 URL: https://issues.apache.org/jira/browse/FLINK-20445
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.11.2
 Environment: * Version:1.11.2
Reporter: Ke Li


When I use sql client configuration, an error is reported, the instruction is 
as follows:
{code:java}
./sql-client.sh embedded -e /root/flink-sql-client/sql-client-demo.yml
{code}
sql-client-demo.yml:
{code:java}
tables:
  - name: SourceTable
type: source-table
update-mode: append
connector:
  type: datagen
  rows-per-second: 5
  fields:
f_sequence:
  kind: sequence
  start: 1
  end: 1000
f_random:
  min: 1
  max: 1000
f_random_str:
  length: 10
schema:
  - name: f_sequence
data-type: INT
  - name: f_random
data-type: INT
  - name: f_random_str
data-type: STRING
{code}
The error is as follows:
{code:java}
No default environment specified.No default environment specified.Searching for 
'/data/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml'...found.Reading
 default environment from: 
file:/data/data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yamlReading 
session environment from: 
file:/root/flink-sql-client/sql-client-demo.ymlException in thread "main" 
org.apache.flink.table.client.SqlClientException: Unexpected exception. This is 
a bug. Please consider filing an issue. at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)Caused by: 
org.apache.flink.table.client.gateway.SqlExecutionException: Could not create 
execution context. at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at 
org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)Caused by: 
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' inthe classpath.
Reason: Required context properties mismatch.
The matching 
candidates:org.apache.flink.table.sources.CsvAppendTableSourceFactoryMismatched 
properties:'connector.type' expects 'filesystem', but is 'datagen''format.type' 
expects 'csv', but is 'json'
The following properties are 
requested:connector.fields.f_random.max=1000connector.fields.f_random.min=1connector.fields.f_random_str.length=10connector.fields.f_sequence.end=1000connector.fields.f_sequence.kind=sequenceconnector.fields.f_sequence.start=1connector.rows-per-second=5connector.type=datagenformat.type=jsonschema.0.data-type=INTschema.0.name=f_sequenceschema.1.data-type=INTschema.1.name=f_randomschema.2.data-type=STRINGschema.2.name=f_random_strupdate-mode=append
The following factories have been 
considered:org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryorg.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactoryorg.apache.flink.table.sources.CsvBatchTableSourceFactoryorg.apache.flink.table.sources.CsvAppendTableSourceFactoryorg.apache.flink.table.filesystem.FileSystemTableFactory
 at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
 at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
 at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
 at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:384)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:638)
 at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:636)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
 ... 3 more
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


State Processor API SQL State

2020-12-01 Thread Dominik Wosiński
Hey,
Is it currently possible to obtain the state that was created by SQL query
via the State Processor API? I am able to load the checkpoint via the State
Processor API, but I wasn't able to think of a way to access the internal
state of my JOIN Query.

Best Regards,
Dom.


[jira] [Created] (FLINK-20444) Chain AsyncWaitOperator to new sources

2020-12-01 Thread Arvid Heise (Jira)
Arvid Heise created FLINK-20444:
---

 Summary: Chain AsyncWaitOperator to new sources
 Key: FLINK-20444
 URL: https://issues.apache.org/jira/browse/FLINK-20444
 Project: Flink
  Issue Type: Improvement
Reporter: Arvid Heise


For legacy sources, we had to disable chaining because of incompatible 
threading models.
New sources are working fine however and it would give some users massive 
performance improvements.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20443) ContinuousProcessingTimeTrigger lost data in last interval in per window

2020-12-01 Thread Gee (Jira)
Gee created FLINK-20443:
---

 Summary: ContinuousProcessingTimeTrigger lost data in last 
interval in per window
 Key: FLINK-20443
 URL: https://issues.apache.org/jira/browse/FLINK-20443
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Gee


 
{code:java}
srcStream
.timeWindowAll(Time.seconds(60))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))...
{code}
 

 This can correctly calculate the following interval result : 0-10s 10-20s 
20-30s 30-40s 40-50s

But this lost data which was send in 50-60s.

Because when the first window ends, the time is 59.s, it is not equal to 
window-end-time(60s).So it will not enter the if judgment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20442) Fix license documentation mistakes in flink-python.jar

2020-12-01 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-20442:
--

 Summary: Fix license documentation mistakes in flink-python.jar
 Key: FLINK-20442
 URL: https://issues.apache.org/jira/browse/FLINK-20442
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.12.0
Reporter: Robert Metzger
 Fix For: 1.12.0


Issues reported by Chesnay:

-The flink-python jar contains 2 license files in the root directory and 
another 2 in the META-INF directory. This should be reduced down to 1 under 
META-INF. I'm inclined to block the release on this because the root license is 
BSD.
- The flink-python jar appears to bundle lz4 (native libraries under win32/, 
linux/ and darwin/), but this is neither listed in the NOTICE nor do we have an 
explicit license file for it.

Other minor things that we should address in the future:
- opt/python contains some LICENSE files that should instead be placed under 
licenses/
- licenses/ contains a stray "ASM" file containing the ASM license. It's not a 
problem (because it is identical with our intended copy), but it indicates that 
something is amiss. This seems to originate from the flink-python jar, which 
bundles some beam stuff, which bundles bytebuddy, which bundles this license 
file. From what I can tell bytebuddy is not actually bundling ASM though; they 
just bundle the license for whatever reason. It is not listed as bundled in the 
flink-python NOTICE though, so I wouldn't block the release on it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[CANCELLED][VOTE] Release 1.12.0, release candidate #2

2020-12-01 Thread Robert Metzger
Thanks a lot for checking the release candidate so quickly.
I agree that the BSD License file in the root of the jar is a red flag.
I filed a ticket for addressing the issues Chesnay found:
https://issues.apache.org/jira/browse/FLINK-20442

I'm hereby officially cancelling this release candidate.

On Tue, Dec 1, 2020 at 1:04 PM Chesnay Schepler  wrote:

> -1
>
> -The flink-python jar contains 2 license files in the root directory and
> another 2 in the META-INF directory. This should be reduced down to 1 under
> META-INF. I'm inclined to block the release on this because the root
> license is BSD.
> - The flink-python jar appears to bundle lz4 (native libraries under
> win32/, linux/ and darwin/), but this is neither listed in the NOTICE nor
> do we have an *explicit* license file for it.
>
> Other minor things that we should address in the future:
> - opt/python contains some LICENSE files that should instead be placed
> under licenses/
> - licenses/ contains a stray "ASM" file containing the ASM license. It's
> not a *problem *(because it is identical with our intended copy), but it
> indicates that something is amiss. This seems to originate from the
> flink-python jar, which bundles some beam stuff, which bundles bytebuddy,
> which bundles this license file. From what I can tell bytebuddy is not
> actually bundling ASM though; they just bundle the license for whatever
> reason. It is not listed as bundled in the flink-python NOTICE though, so I
> wouldn't block the release on it.
>
>
> On 12/1/2020 7:35 AM, Robert Metzger wrote:
>
> Hi everyone,
>
> It seems that all blockers for 1.12.0 have been resolved, this is the first
> voting release candidate.
>
> Please review and vote on the release candidate #2 for the version 1.12.0,
> as follows:
>
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1a], and website release notes [1b]
> * the official Apache source release and binary convenience releases to be
> deployed to dist.apache.org [2], which are signed with the key with
> fingerprint D9839159 [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-1.12.0-rc2" [5]
>
> We will soon publish the PR for the release announcement blog post!
>
> The vote will be open for at least 72 hours (Friday 7:35am CET). It is
> adopted by majority approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Dian & Robert
>
> [1a]https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348263
> [1b] https://github.com/apache/flink/pull/14195
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc2/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1403
> [5] https://github.com/apache/flink/releases/tag/release-1.12.0-rc2
>
>
>


Re: [VOTE] Release 1.12.0, release candidate #2

2020-12-01 Thread Chesnay Schepler

-1

-The flink-python jar contains 2 license files in the root directory and 
another 2 in the META-INF directory. This should be reduced down to 1 
under META-INF. I'm inclined to block the release on this because the 
root license is BSD.
- The flink-python jar appears to bundle lz4 (native libraries under 
win32/, linux/ and darwin/), but this is neither listed in the NOTICE 
nor do we have an /explicit/ license file for it.


Other minor things that we should address in the future:
- opt/python contains some LICENSE files that should instead be placed 
under licenses/
- licenses/ contains a stray "ASM" file containing the ASM license. It's 
not a /problem /(because it is identical with our intended copy), but it 
indicates that something is amiss. This seems to originate from the 
flink-python jar, which bundles some beam stuff, which bundles 
bytebuddy, which bundles this license file. From what I can tell 
bytebuddy is not actually bundling ASM though; they just bundle the 
license for whatever reason. It is not listed as bundled in the 
flink-python NOTICE though, so I wouldn't block the release on it.



On 12/1/2020 7:35 AM, Robert Metzger wrote:

Hi everyone,

It seems that all blockers for 1.12.0 have been resolved, this is the first
voting release candidate.

Please review and vote on the release candidate #2 for the version 1.12.0,
as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)


The complete staging area is available for your review, which includes:
* JIRA release notes [1a], and website release notes [1b]
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [2], which are signed with the key with
fingerprint D9839159 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "release-1.12.0-rc2" [5]

We will soon publish the PR for the release announcement blog post!

The vote will be open for at least 72 hours (Friday 7:35am CET). It is
adopted by majority approval, with at least 3 PMC affirmative votes.

Thanks,
Dian & Robert

[1a]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12348263
[1b] https://github.com/apache/flink/pull/14195
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc2/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1403
[5] https://github.com/apache/flink/releases/tag/release-1.12.0-rc2





[jira] [Created] (FLINK-20441) Deprecate CheckpointConfig.setPreferCheckpointForRecovery

2020-12-01 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-20441:
-

 Summary: Deprecate CheckpointConfig.setPreferCheckpointForRecovery
 Key: FLINK-20441
 URL: https://issues.apache.org/jira/browse/FLINK-20441
 Project: Flink
  Issue Type: Task
  Components: API / DataStream, Runtime / Checkpointing
Affects Versions: 1.12.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.13.0


According to FLINK-20427, we should deprecate 
{{CheckpointConfig.setPreferCheckpointForRecovery}} so that we can remove it in 
the next release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20440) `LAST_VALUE` aggregate function can not be used in Hop window

2020-12-01 Thread zouyunhe (Jira)
zouyunhe created FLINK-20440:


 Summary: `LAST_VALUE` aggregate function can not be used in Hop 
window
 Key: FLINK-20440
 URL: https://issues.apache.org/jira/browse/FLINK-20440
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.0
Reporter: zouyunhe


Hi,  I run a sql job which use `last_value`  aggregate function in a hop 
window,  the sql as shown below

 
{code:java}
create table test_in(
  id BIGINT,
  `name` VARCHAR,
  cost INT,
  proctime as PROCTIME()
) with (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'test_in',
  'connector.startup-mode' = 'latest-offset',
  'connector.properties.bootstrap.servers' = '',
  'connector.properties.group.id' = 'cdbddd',
  'connector.properties.zookeeper.connect' = '',
  'format.type' = 'csv'
);
create table test_mysql(
  id BIGINT,
  `name` VARCHAR,
  COST DOUBLE
) with (
  'connector.type' = 'jdbc',
'connector.url' = '',
  'connector.table' = 'abc',
  'connector.username' = 'abcdd',
  'connector.write.flush.interval' = '2s'
);insert into
  `test_mysql`
select
  a.id,
  last_value(a.`name`),
  last_value(a.cost)
from
  test_in as a group by id, HOP(PROCTIME(), interval '10' second, interval '30' 
second);
{code}
and when submit the job, the exception throws 
{code:java}
org.apache.flink.table.api.ValidationException: Function class 
'org.apache.flink.table.planner.functions.aggfunctions.LastValueAggFunction.StringLastValueAggFunction'
 does not implement at least one method named 'merge' which is public, not 
abstract and (in case of table functions) not static.
at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.checkAndExtractMethods(UserDefinedFunctionUtils.scala:442)
at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:318)
at 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils$.getUserDefinedMethod(UserDefinedFunctionUtils.scala:273)
at 
org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:474)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1(AggsHandlerCodeGenerator.scala:1116)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.$anonfun$checkNeededMethods$1$adapted(AggsHandlerCodeGenerator.scala:1116)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1116)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:929)
at 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:578)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.createAggsHandler(StreamExecGroupWindowAggregateBase.scala:248)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:162)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:165)
at 

[jira] [Created] (FLINK-20439) Consider simplifying or removing mechanism to scheduleOrUpdateConsumers

2020-12-01 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-20439:
---

 Summary: Consider simplifying or removing mechanism to 
scheduleOrUpdateConsumers
 Key: FLINK-20439
 URL: https://issues.apache.org/jira/browse/FLINK-20439
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhu Zhu
 Fix For: 1.13.0


Execution#scheduleOrUpdateConsumers() was used for multiple purposes:
- schedule a vertex when its PIPELINED inputs have produced data 
- schedule a vertex when its BLOCKING inputs have finished
- update consumed partition info to RUNNING consumers
- cache consumed partition info for DEPLOYING consumers

It is not needed anymore in the latest pipelined region scheduling because 
- a region will be scheduled only when all its upstream regions have finished
- a vertex will always know all its consumed partitions when scheduled

So we can consider how to simply or remove it, which also involves things like 
UnknownInputChannel, ResultPartitionConsumableNotifier, 
Execution#cachePartitionInfo(), etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Moving to JUnit5

2020-12-01 Thread Khachatryan Roman
+1 for the migration

(I agree with Dawid, for me the most important benefit is better support of
parameterized tests).

Regards,
Roman


On Mon, Nov 30, 2020 at 9:42 PM Arvid Heise  wrote:

> Hi Till,
>
> immediate benefit would be mostly nested tests for a better test structure
> and new parameterized tests for less clutter (often test functionality is
> split into parameterized test and non-parameterized test because of JUnit4
> limitation). Additionally, having Java8 lambdas to perform fine-grain
> exception handling would make all related tests more readable (@Test only
> allows one exception per test method, while in reality we often have more
> exceptions / more fine grain assertions and need to resort to try-catch --
> yuck!). The extension mechanism would also make the mini cluster much
> easier to use: we often have to start the cluster manually because of
> test-specific configuration, which can be easily avoided in JUnit5.
>
> In the medium and long-term, I'd also like to use the modular
> infrastructure and improved parallelization. The former would allow us
> better to implement cross-cutting features like TestLogger (why do we need
> to extend that manually in every test?). The latter is more relevant for
> the next push on CI, which would be especially interesting with e2e being
> available in Java.
>
> On Mon, Nov 30, 2020 at 2:07 PM Dawid Wysakowicz 
> wrote:
>
> > Hi all,
> >
> > Just wanted to express my support for the idea. I did miss certain
> > features of JUnit 5 already, an important one being much better support
> > for parameterized tests.
> >
> > Best,
> >
> > Dawid
> >
> > On 30/11/2020 13:50, Arvid Heise wrote:
> > > Hi Chesnay,
> > >
> > > The vintage runner supports the old annotations, so we don't have to
> > change
> > > them in the first step.
> > >
> > > The only thing that we need to change are all rules that do not extend
> > > ExternalResource (e.g., TestWatcher used in TestLogger). This change
> > needs
> > > to be done swiftly as this affects the shared infrastructure as you
> have
> > > mentioned.
> > >
> > > Only afterwards, we start to actually migrate the individual tests.
> That
> > > can be done module by module or as we go. I actually found a nice
> article
> > > that leverages the migration assist of IntelliJ [1].
> > >
> > > As the last stop, we remove the vintage runner - all JUnit4 tests have
> > been
> > > migrated and new tests cannot use old annotation etc. anymore.
> > >
> > > [1]
> > >
> >
> https://blog.jetbrains.com/idea/2020/08/migrating-from-junit-4-to-junit-5/
> > >
> > > On Mon, Nov 30, 2020 at 1:32 PM Chesnay Schepler 
> > wrote:
> > >
> > >> I presume we cannot do the migration module-wise due to shared test
> > >> utilities that rely on JUnit interfaces?
> > >>
> > >> On 11/30/2020 1:30 PM, Chesnay Schepler wrote:
> > >>> Is it feasible that 2 people can do the migration within a short
> > >>> time-frame (say, a week)?
> > >>> Must the migration of a test be done in one go, or can we for example
> > >>> first rename all the Before/After annotations and then to the rest?
> > >>> Are there any issues with other test dependencies (i.e., hamcrest,
> > >>> powermock (PowerMockRunner), mockito) that we should be aware of?
> > >>>
> > >>> I generally like the idea of using JUnit 5, but am wary of this
> > >>> migration dragging on for too long.
> > >>>
> > >>> On 11/27/2020 3:29 PM, Arvid Heise wrote:
> >  Dear devs,
> > 
> >  I'd like to start a discussion to migrate to a higher JUnit version.
> > 
> >  The main motivations are:
> >  - Making full use of Java 8 Lambdas for writing easier to read tests
> >  and a
> >  better performing way of composing failure messages.
> >  - Improved test structures with nested and dynamic tests.
> >  - Much better support for parameterized tests to avoid separating
> >  parameterized and non-parameterized parts into different test
> classes.
> >  - Composable dependencies and better hooks for advanced use cases
> >  (TestLogger).
> >  - Better exception verification
> >  - More current infrastructure
> >  - Better parallelizable
> > 
> >  Why now?
> >  - JUnit5 is now mature enough to consider it for such a complex
> > project
> >  - We are porting more and more e2e tests to JUnit and it would be a
> >  pity to
> >  do all the work twice (okay some already has been done and would
> >  result in
> >  adjustments, but the sooner we migrate, the less needs to be touched
> >  twice)
> > 
> >  Why JUnit5?
> >  There are other interesting alternatives, such as TestNG. I'm happy
> >  to hear
> >  specific alternatives. For now, I'd like to focus on JUnit4 for an
> >  easier
> >  migration path.
> > 
> >  Please discuss if you would also be interested in moving onward. To
> > get
> >  some overview, I'd like to see some informal +1 for the options:
> > 
> >  [ ] Stick to 

[jira] [Created] (FLINK-20438) org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest fails due to missing output

2020-12-01 Thread Matthias (Jira)
Matthias created FLINK-20438:


 Summary: 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest fails 
due to missing output
 Key: FLINK-20438
 URL: https://issues.apache.org/jira/browse/FLINK-20438
 Project: Flink
  Issue Type: Test
  Components: API / DataStream
Affects Versions: 1.13.0
Reporter: Matthias


[This 
build|https://dev.azure.com/mapohl/flink/_build/results?buildId=122=logs=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267=ab910030-93db-52a7-74a3-34a0addb481b]
 failed due to 
{{org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest}} 
not getting back, i.e. not producing output, for 900 seconds.

The Thread Stacktraces are printed in the build's output. The last test that 
didn't finish is {{SystemProcessingTimeServiceTest}} according to the build 
artifact's {{mvn-2.log}}:

{noformat}
# ...

Test 
testShutdownAndWaitPending(org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest)
 is running.

15:27:56,727 [main] INFO  
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest [] - 

Test 
testShutdownAndWaitPending(org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest)
 successfully run.

15:27:56,728 [main] INFO  
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest [] - 

Test 
testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture(org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest)
 is running.
{noformat}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20437) Port ExecNode to Java

2020-12-01 Thread godfrey he (Jira)
godfrey he created FLINK-20437:
--

 Summary: Port ExecNode to Java
 Key: FLINK-20437
 URL: https://issues.apache.org/jira/browse/FLINK-20437
 Project: Flink
  Issue Type: Sub-task
Reporter: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20436) Simplify type parameter of ExecNod

2020-12-01 Thread godfrey he (Jira)
godfrey he created FLINK-20436:
--

 Summary: Simplify type parameter of ExecNod
 Key: FLINK-20436
 URL: https://issues.apache.org/jira/browse/FLINK-20436
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he






--
This message was sent by Atlassian Jira
(v8.3.4#803005)