[jira] [Commented] (FLINK-10206) Add hbase stream connector
[ https://issues.apache.org/jira/browse/FLINK-10206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592501#comment-16592501 ] Shimin Yang commented on FLINK-10206: - Our streaming job heavily relied on hbase as a sink. I am willing to do the streaming part of hbase sink. Can we split into two subtasks? One for batch and one for streaming. > Add hbase stream connector > -- > > Key: FLINK-10206 > URL: https://issues.apache.org/jira/browse/FLINK-10206 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Igloo >Assignee: Shimin Yang >Priority: Critical > Fix For: 1.6.1 > > Original Estimate: 336h > Remaining Estimate: 336h > > Now, there is a hbase connector for batch operation. > > In some cases, we need to save Streaming result into hbase. Just like > cassandra streaming sink implementations. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tragicjun commented on a change in pull request #6508: [Flink-10079] [table] Automatically register sink table from external catalogs
tragicjun commented on a change in pull request #6508: [Flink-10079] [table] Automatically register sink table from external catalogs URL: https://github.com/apache/flink/pull/6508#discussion_r210471287 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -750,7 +751,28 @@ abstract class TableEnvironment(val config: TableConfig) { if (null == sinkTableName) throw TableException("Name of TableSink must not be null.") if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.") if (!isRegistered(sinkTableName)) { - throw TableException(s"No table was registered under the name $sinkTableName.") + // try resolving and registering sink table from registered external catalogs + try { +val paths = sinkTableName.split("\\.") +if (paths.length > 1) { Review comment: @walterddr thanks for reviewing. I assume full-path is required for both source table and sink table, as currently there is no "**USE** _database_" style of switching catalog/database supported. If fully qualified name is enforced, the first element of the path must be `catalog` name and the last element of the path must be `table` name. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-6344) Migrate from Java serialization for BucketingSink's state
[ https://issues.apache.org/jira/browse/FLINK-6344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-6344: --- Assignee: vinoyang (was: Hai Zhou) > Migrate from Java serialization for BucketingSink's state > - > > Key: FLINK-6344 > URL: https://issues.apache.org/jira/browse/FLINK-6344 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: vinoyang >Priority: Major > > See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration > for `BucketingSink`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-6347) Migrate from Java serialization for MessageAcknowledgingSourceBase's state
[ https://issues.apache.org/jira/browse/FLINK-6347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-6347: --- Assignee: vinoyang (was: Hai Zhou) > Migrate from Java serialization for MessageAcknowledgingSourceBase's state > -- > > Key: FLINK-6347 > URL: https://issues.apache.org/jira/browse/FLINK-6347 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: vinoyang >Priority: Major > > See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration > for {{MessageAcknowledgingSourceBase}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10214) why is job applying as many TMs as default parallelism when starting, each parallelism is 1
[ https://issues.apache.org/jira/browse/FLINK-10214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592461#comment-16592461 ] lzh9 commented on FLINK-10214: -- [~yanghua] , the parallelism is 6*8=48. If it's set 1, the job will apply only one TM and use one slot. > why is job applying as many TMs as default parallelism when starting, each > parallelism is 1 > --- > > Key: FLINK-10214 > URL: https://issues.apache.org/jira/browse/FLINK-10214 > Project: Flink > Issue Type: Task > Components: Cluster Management, TaskManager, YARN >Affects Versions: 1.5.0 >Reporter: lzh9 >Priority: Major > > If I set TM number=6, slot num each TM=8, memory each TM=4G, the job will > apply 48 TMs, and use only one slot each, total memory usage is 192G. A few > minutes later, it will release 40TMs, everything will be normal. > Is there any reason about it? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7624) Add kafka-topic for "KafkaProducer" metrics
[ https://issues.apache.org/jira/browse/FLINK-7624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-7624: --- Assignee: vinoyang > Add kafka-topic for "KafkaProducer" metrics > --- > > Key: FLINK-7624 > URL: https://issues.apache.org/jira/browse/FLINK-7624 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou >Assignee: vinoyang >Priority: Major > > Currently, metric in "KafkaProducer" MetricGroup, Such as: > {code:java} > localhost.taskmanager.dc4092a96ea4e54ecdbd13b9a5c209b2.Flink Streaming > Job.Sink--MTKafkaProducer08.0.KafkaProducer.record-queue-time-avg > {code} > The metric name in the "KafkaProducer" group does not have a kafka-topic name > part, if the job writes data to two different kafka sinks, these metrics > will not distinguish. > I wish that modify the above metric name as follows: > {code:java} > localhost.taskmanager.dc4092a96ea4e54ecdbd13b9a5c209b2.Flink Streaming > Job.Sink--MTKafkaProducer08.0.KafkaProducer. topic>.record-queue-time-avg > {code} > Best, > Hai Zhou -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10214) why is job applying as many TMs as default parallelism when starting, each parallelism is 1
[ https://issues.apache.org/jira/browse/FLINK-10214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592449#comment-16592449 ] vinoyang commented on FLINK-10214: -- [~lzh9] What is the parallelism of your job? Since FLIP-6, Flink has adopted an elastic resource application method. > why is job applying as many TMs as default parallelism when starting, each > parallelism is 1 > --- > > Key: FLINK-10214 > URL: https://issues.apache.org/jira/browse/FLINK-10214 > Project: Flink > Issue Type: Task > Components: Cluster Management, TaskManager, YARN >Affects Versions: 1.5.0 >Reporter: lzh9 >Priority: Major > > If I set TM number=6, slot num each TM=8, memory each TM=4G, the job will > apply 48 TMs, and use only one slot each, total memory usage is 192G. A few > minutes later, it will release 40TMs, everything will be normal. > Is there any reason about it? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10212) REST API for listing all the available save points
[ https://issues.apache.org/jira/browse/FLINK-10212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592436#comment-16592436 ] vinoyang commented on FLINK-10212: -- Hi [~mrooding] good thoughts, I think GET is better for the semantic of Restful , Are you going to start this issue? If not, I'd like to do it. > REST API for listing all the available save points > -- > > Key: FLINK-10212 > URL: https://issues.apache.org/jira/browse/FLINK-10212 > Project: Flink > Issue Type: New Feature >Reporter: Marc Rooding >Priority: Major > > *Background* > I'm one of the authors of the open-source Flink job deployer > ([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our > implementation to use the Flink REST API instead of the native CLI. > In our use case, we store the job savepoints in a Kubernetes persistent > volume. For our deployer, we mount the persistent volume to our deployer > container so that we can find and use the savepoints. > In the rewrite to the REST API, I saw that the API to monitor savepoint > creation returns the complete path to the created savepoint, and we can use > this one in the job deployer to start the new job with the latest save point. > However, we also allow users to deploy a job with a recovered state by > specifying only the directory savepoints are stored in. In this scenario we > will look for the latest savepoint created for this job ourselves inside the > given directory. To find this path, we're still relying on the mounted volume > and listing directory content to discover savepoints. > *Feature* > I was thinking that it might be a good addition if the native Flink REST API > offers the ability to retrieve savepoints. Seeing that the API doesn't > inherently know where savepoints are stored, it could take a path as one of > the arguments. It could even allow the user to provide a job ID as an > argument so that the API would be able to search for savepoints for a > specific job ID in the specified directory. > As the API would require the path as an argument, and providing a path > containing forward slashes in the URL isn't ideal, I'm eager to discuss what > a proper solution would look like. > A POST request to /jobs/:jobid/savepoints with the path as a body parameter > would make sense if the API were to offer to list all save points in a > specific path but this request is already being used for creating new > savepoints. > An alternative could be a POST to /savepoints with the path and job ID in the > request body. > A POST request to retrieve data is obviously not the most straightforward > approach but in my opinion still preferable over a GET to, for example, > /jobs/:jobid/savepoints/:targetDirectory > I'm willing to help out on this one by submitting a pull request. > Looking forward to your thoughts! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592433#comment-16592433 ] ASF GitHub Bot commented on FLINK-10136: yanghua commented on a change in pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597#discussion_r212788014 ## File path: docs/dev/table/functions.md ## @@ -2614,6 +2626,18 @@ STRING.rtrim() E.g., 'This is a test String. '.rtrim() returns "This is a test String.". + + + +{% highlight java %} +STRING.repeat(INT) +{% endhighlight %} + + +Returns a string that repeats the base STRING INT times. +E.g., "This is a test String. ".repeat(2) returns "This is a test String.This is a test String.". Review comment: Thank you very much. @xccui This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add REPEAT supported in Table API and SQL > - > > Key: FLINK-10136 > URL: https://issues.apache.org/jira/browse/FLINK-10136 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Oracle : > [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat] > MySql: > https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL
yanghua commented on a change in pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597#discussion_r212788014 ## File path: docs/dev/table/functions.md ## @@ -2614,6 +2626,18 @@ STRING.rtrim() E.g., 'This is a test String. '.rtrim() returns "This is a test String.". + + + +{% highlight java %} +STRING.repeat(INT) +{% endhighlight %} + + +Returns a string that repeats the base STRING INT times. +E.g., "This is a test String. ".repeat(2) returns "This is a test String.This is a test String.". Review comment: Thank you very much. @xccui This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by
[ https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592422#comment-16592422 ] ASF GitHub Bot commented on FLINK-9610: --- cricket007 commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181#issuecomment-415930190 Kafka uses a Murmur2 Hash, not `Arrays.hashCode` https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L69 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Kafka partitioner that uses the key to partition by > --- > > Key: FLINK-9610 > URL: https://issues.apache.org/jira/browse/FLINK-9610 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Reporter: Niels Basjes >Assignee: Niels Basjes >Priority: Major > Labels: pull-request-available > > The kafka connector package only contains the FlinkFixedPartitioner > implementation of the FlinkKafkaPartitioner. > The most common usecase I have seen is the need to spread the records across > the Kafka partitions while keeping all messages with the same key together. > I'll put up a pull request with a very simple implementation that should make > this a lot easier for others to use and extend. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] cricket007 commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key.
cricket007 commented on issue #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafka Partitioner that uses the hash of the provided key. URL: https://github.com/apache/flink/pull/6181#issuecomment-415930190 Kafka uses a Murmur2 Hash, not `Arrays.hashCode` https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L69 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10214) why is job applying as many TMs as default parallelism when starting, each parallelism is 1
lzh9 created FLINK-10214: Summary: why is job applying as many TMs as default parallelism when starting, each parallelism is 1 Key: FLINK-10214 URL: https://issues.apache.org/jira/browse/FLINK-10214 Project: Flink Issue Type: Task Components: Cluster Management, TaskManager, YARN Affects Versions: 1.5.0 Reporter: lzh9 If I set TM number=6, slot num each TM=8, memory each TM=4G, the job will apply 48 TMs, and use only one slot each, total memory usage is 192G. A few minutes later, it will release 40TMs, everything will be normal. Is there any reason about it? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10136) Add REPEAT supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui resolved FLINK-10136. - Resolution: Done Implemented in 1.7.0 505dca174128ebb3bf765778ee36d58f680d6a1e > Add REPEAT supported in Table API and SQL > - > > Key: FLINK-10136 > URL: https://issues.apache.org/jira/browse/FLINK-10136 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Oracle : > [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat] > MySql: > https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests
[ https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-10201: Fix Version/s: 1.7.0 > The batchTestUtil was mistakenly used in some stream sql tests > -- > > Key: FLINK-10201 > URL: https://issues.apache.org/jira/browse/FLINK-10201 > Project: Flink > Issue Type: Test > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > The {{batchTestUtil}} was mistakenly used in stream sql tests > {{SetOperatorsTest.testValuesWithCast()}} and > {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10136) Add REPEAT supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-10136: Fix Version/s: 1.7.0 > Add REPEAT supported in Table API and SQL > - > > Key: FLINK-10136 > URL: https://issues.apache.org/jira/browse/FLINK-10136 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Oracle : > [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat] > MySql: > https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592376#comment-16592376 ] ASF GitHub Bot commented on FLINK-10136: asfgit closed pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md index 24d8d70080b..759cf2f8db0 100644 --- a/docs/dev/table/functions.md +++ b/docs/dev/table/functions.md @@ -2401,6 +2401,18 @@ RTRIM(string) + + +{% highlight text %} +REPEAT(string, integer) +{% endhighlight %} + + +Returns a string that repeats the base string integer times. +E.g., REPEAT('This is a test String.', 2) returns "This is a test String.This is a test String.". + + + {% highlight text %} @@ -2614,6 +2626,18 @@ STRING.rtrim() E.g., 'This is a test String. '.rtrim() returns "This is a test String.". + + + +{% highlight java %} +STRING.repeat(INT) +{% endhighlight %} + + +Returns a string that repeats the base STRING INT times. +E.g., "This is a test String. ".repeat(2) returns "This is a test String.This is a test String.". + + @@ -2830,6 +2854,18 @@ STRING.rtrim() + + +{% highlight scala %} +STRING.repeat(INT) +{% endhighlight %} + + +Returns a string that repeats the base STRING INT times. +E.g., "This is a test String. ".repeat(2) returns "This is a test String.This is a test String.". + + + {% highlight scala %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 8b08af68117..a0cfac65923 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -573,6 +573,11 @@ trait ImplicitExpressionOperations { */ def rtrim() = RTrim(expr) + /** +* Returns a string that repeats the base string n times. +*/ + def repeat(n: Expression) = Repeat(expr, n) + // Temporal operations /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index 7eb91d3806d..899cb0ff35a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -141,4 +141,10 @@ object BuiltInMethods { val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[String]) val UUID: Method = Types.lookupMethod(classOf[ScalarFunctions], "uuid") + + val REPEAT: Method = Types.lookupMethod( +classOf[ScalarFunctions], +"repeat", +classOf[String], +classOf[Int]) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 7c328c98be1..c7eb869a5b5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -176,6 +176,12 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethod.RTRIM.method) + addSqlFunctionMethod( +REPEAT, +Seq(STRING_TYPE_INFO, INT_TYPE_INFO), +STRING_TYPE_INFO, +BuiltInMethods.REPEAT) + // -- // Arithmetic functions // -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala index b2d7a3deb18..70794407cc0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala +++ b/flink-libraries/flink
[GitHub] asfgit closed pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL
asfgit closed pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md index 24d8d70080b..759cf2f8db0 100644 --- a/docs/dev/table/functions.md +++ b/docs/dev/table/functions.md @@ -2401,6 +2401,18 @@ RTRIM(string) + + +{% highlight text %} +REPEAT(string, integer) +{% endhighlight %} + + +Returns a string that repeats the base string integer times. +E.g., REPEAT('This is a test String.', 2) returns "This is a test String.This is a test String.". + + + {% highlight text %} @@ -2614,6 +2626,18 @@ STRING.rtrim() E.g., 'This is a test String. '.rtrim() returns "This is a test String.". + + + +{% highlight java %} +STRING.repeat(INT) +{% endhighlight %} + + +Returns a string that repeats the base STRING INT times. +E.g., "This is a test String. ".repeat(2) returns "This is a test String.This is a test String.". + + @@ -2830,6 +2854,18 @@ STRING.rtrim() + + +{% highlight scala %} +STRING.repeat(INT) +{% endhighlight %} + + +Returns a string that repeats the base STRING INT times. +E.g., "This is a test String. ".repeat(2) returns "This is a test String.This is a test String.". + + + {% highlight scala %} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 8b08af68117..a0cfac65923 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -573,6 +573,11 @@ trait ImplicitExpressionOperations { */ def rtrim() = RTrim(expr) + /** +* Returns a string that repeats the base string n times. +*/ + def repeat(n: Expression) = Repeat(expr, n) + // Temporal operations /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala index 7eb91d3806d..899cb0ff35a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala @@ -141,4 +141,10 @@ object BuiltInMethods { val HEX_STRING: Method = Types.lookupMethod(classOf[ScalarFunctions], "hex", classOf[String]) val UUID: Method = Types.lookupMethod(classOf[ScalarFunctions], "uuid") + + val REPEAT: Method = Types.lookupMethod( +classOf[ScalarFunctions], +"repeat", +classOf[String], +classOf[Int]) } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 7c328c98be1..c7eb869a5b5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -176,6 +176,12 @@ object FunctionGenerator { STRING_TYPE_INFO, BuiltInMethod.RTRIM.method) + addSqlFunctionMethod( +REPEAT, +Seq(STRING_TYPE_INFO, INT_TYPE_INFO), +STRING_TYPE_INFO, +BuiltInMethods.REPEAT) + // -- // Arithmetic functions // -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala index b2d7a3deb18..70794407cc0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/stringExpressions.scala @@ -459,3 +459,31 @@ case class RTrim(child: Expression) extends UnaryExpression with InputTypeSpec { override def toString = s"($child).rtrim" } + +/** + * Returns a stri
[jira] [Commented] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests
[ https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592372#comment-16592372 ] ASF GitHub Bot commented on FLINK-10201: xccui closed pull request #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests URL: https://github.com/apache/flink/pull/6605 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala index efb83b456ed..f179ae6cfac 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala @@ -106,7 +106,7 @@ class CorrelateTest extends TableTestBase { @Test def testLeftOuterJoinAsSubQuery(): Unit = { -val util = batchTestUtil() +val util = streamTestUtil() val func1 = new TableFunc1 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2) @@ -121,13 +121,13 @@ class CorrelateTest extends TableTestBase { | ON c2 = s """.stripMargin val expected = binaryNode( - "DataSetJoin", - batchTableNode(1), + "DataStreamJoin", + streamTableNode(1), unaryNode( -"DataSetCalc", +"DataStreamCalc", unaryNode( - "DataSetCorrelate", - batchTableNode(0), + "DataStreamCorrelate", + streamTableNode(0), term("invocation", "func1($cor0.c)"), term("correlate", "table(func1($cor0.c))"), term("select", "a", "b", "c", "f0"), diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala index 97dbe0dad66..fb4b4b5f409 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala @@ -173,29 +173,26 @@ class SetOperatorsTest extends TableTestBase { @Test def testValuesWithCast(): Unit = { -val util = batchTestUtil() +val util = streamTestUtil() val expected = naryNode( - "DataSetUnion", + "DataStreamUnion", List( -unaryNode("DataSetCalc", - values("DataSetValues", -tuples(List("0")), -"values=[ZERO]"), +unaryNode("DataStreamCalc", + values("DataStreamValues", +tuples(List("0"))), term("select", "1 AS EXPR$0, 1 AS EXPR$1")), -unaryNode("DataSetCalc", - values("DataSetValues", -tuples(List("0")), -"values=[ZERO]"), +unaryNode("DataStreamCalc", + values("DataStreamValues", +tuples(List("0"))), term("select", "2 AS EXPR$0, 2 AS EXPR$1")), -unaryNode("DataSetCalc", - values("DataSetValues", -tuples(List("0")), -"values=[ZERO]"), +unaryNode("DataStreamCalc", + values("DataStreamValues", +tuples(List("0"))), term("select", "3 AS EXPR$0, 3 AS EXPR$1")) ), term("all", "true"), - term("union", "EXPR$0, EXPR$1") + term("union all", "EXPR$0, EXPR$1") ) util.verifySql( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The batchTestUtil was mistakenly used in some stream sql tests > -- > > Key: FLINK-10201 > URL: https://issues.apache.org/jira/browse/FLINK-10201 > Project: Flink > Issue Type: Test > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Labels: pull-request-available > > The {{batchTestUtil}} was mistakenly used in stream sql tests > {{SetOperatorsTest.testValuesWithCast()}} and > {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests
[ https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui resolved FLINK-10201. - Resolution: Fixed Fixed in 1.7.0 6d28a65092ffe4a4390fccacb4deb7e403924f51 > The batchTestUtil was mistakenly used in some stream sql tests > -- > > Key: FLINK-10201 > URL: https://issues.apache.org/jira/browse/FLINK-10201 > Project: Flink > Issue Type: Test > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Labels: pull-request-available > > The {{batchTestUtil}} was mistakenly used in stream sql tests > {{SetOperatorsTest.testValuesWithCast()}} and > {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui closed pull request #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests
xccui closed pull request #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests URL: https://github.com/apache/flink/pull/6605 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala index efb83b456ed..f179ae6cfac 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala @@ -106,7 +106,7 @@ class CorrelateTest extends TableTestBase { @Test def testLeftOuterJoinAsSubQuery(): Unit = { -val util = batchTestUtil() +val util = streamTestUtil() val func1 = new TableFunc1 util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c) util.addTable[(Int, Long, String)]("MyTable2", 'a2, 'b2, 'c2) @@ -121,13 +121,13 @@ class CorrelateTest extends TableTestBase { | ON c2 = s """.stripMargin val expected = binaryNode( - "DataSetJoin", - batchTableNode(1), + "DataStreamJoin", + streamTableNode(1), unaryNode( -"DataSetCalc", +"DataStreamCalc", unaryNode( - "DataSetCorrelate", - batchTableNode(0), + "DataStreamCorrelate", + streamTableNode(0), term("invocation", "func1($cor0.c)"), term("correlate", "table(func1($cor0.c))"), term("select", "a", "b", "c", "f0"), diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala index 97dbe0dad66..fb4b4b5f409 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/SetOperatorsTest.scala @@ -173,29 +173,26 @@ class SetOperatorsTest extends TableTestBase { @Test def testValuesWithCast(): Unit = { -val util = batchTestUtil() +val util = streamTestUtil() val expected = naryNode( - "DataSetUnion", + "DataStreamUnion", List( -unaryNode("DataSetCalc", - values("DataSetValues", -tuples(List("0")), -"values=[ZERO]"), +unaryNode("DataStreamCalc", + values("DataStreamValues", +tuples(List("0"))), term("select", "1 AS EXPR$0, 1 AS EXPR$1")), -unaryNode("DataSetCalc", - values("DataSetValues", -tuples(List("0")), -"values=[ZERO]"), +unaryNode("DataStreamCalc", + values("DataStreamValues", +tuples(List("0"))), term("select", "2 AS EXPR$0, 2 AS EXPR$1")), -unaryNode("DataSetCalc", - values("DataSetValues", -tuples(List("0")), -"values=[ZERO]"), +unaryNode("DataStreamCalc", + values("DataStreamValues", +tuples(List("0"))), term("select", "3 AS EXPR$0, 3 AS EXPR$1")) ), term("all", "true"), - term("union", "EXPR$0, EXPR$1") + term("union all", "EXPR$0, EXPR$1") ) util.verifySql( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs
fhueske commented on issue #6508: [Flink-10079] [table] Automatically register sink table from external catalogs URL: https://github.com/apache/flink/pull/6508#issuecomment-415906486 Hi @tragicjun, sorry for the late reply. I'll have a look at this PR next week. Best, Fabian This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9086) Use INTERNAL_HADOOP_CLASSPATHS as classpath for launching Yarn cluster for Scala Shell
[ https://issues.apache.org/jira/browse/FLINK-9086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592298#comment-16592298 ] ASF GitHub Bot commented on FLINK-9086: --- ruankd commented on issue #5768: [FLINK-9086] Use INTERNAL_HADOOP_CLASSPATHS as classpath for Yarn Session URL: https://github.com/apache/flink/pull/5768#issuecomment-415904593 Ping. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use INTERNAL_HADOOP_CLASSPATHS as classpath for launching Yarn cluster for > Scala Shell > -- > > Key: FLINK-9086 > URL: https://issues.apache.org/jira/browse/FLINK-9086 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Keda Ruan >Priority: Major > Labels: pull-request-available > > Some environments don't have hadoop jars in {{HADOOP_CLASSPATH}}, causing > Flink Yarn cluster unable to start due to some jars missing. For example > Flink 1.4.0 drops jersey dependency in its shaded hadoop jar, causing Yarn > cluster unable to start: > {quote}Exception in thread "main" java.lang.NoClassDefFoundError: > com/sun/jersey/core/util/FeaturesAndProperties > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:763) > {quote} > Since there is a {{INTERNAL_HADOOP_CLASSPATHS}} in {{config.sh}} that has > {{hadoop classpath}} result, we can utilize this and improve the user > experience on starting Flink Yarn cluster for Scala Shell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9086) Use INTERNAL_HADOOP_CLASSPATHS as classpath for launching Yarn cluster for Scala Shell
[ https://issues.apache.org/jira/browse/FLINK-9086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9086: -- Labels: pull-request-available (was: ) > Use INTERNAL_HADOOP_CLASSPATHS as classpath for launching Yarn cluster for > Scala Shell > -- > > Key: FLINK-9086 > URL: https://issues.apache.org/jira/browse/FLINK-9086 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Keda Ruan >Priority: Major > Labels: pull-request-available > > Some environments don't have hadoop jars in {{HADOOP_CLASSPATH}}, causing > Flink Yarn cluster unable to start due to some jars missing. For example > Flink 1.4.0 drops jersey dependency in its shaded hadoop jar, causing Yarn > cluster unable to start: > {quote}Exception in thread "main" java.lang.NoClassDefFoundError: > com/sun/jersey/core/util/FeaturesAndProperties > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:763) > {quote} > Since there is a {{INTERNAL_HADOOP_CLASSPATHS}} in {{config.sh}} that has > {{hadoop classpath}} result, we can utilize this and improve the user > experience on starting Flink Yarn cluster for Scala Shell. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ruankd commented on issue #5768: [FLINK-9086] Use INTERNAL_HADOOP_CLASSPATHS as classpath for Yarn Session
ruankd commented on issue #5768: [FLINK-9086] Use INTERNAL_HADOOP_CLASSPATHS as classpath for Yarn Session URL: https://github.com/apache/flink/pull/5768#issuecomment-415904593 Ping. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available
[ https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592297#comment-16592297 ] ASF GitHub Bot commented on FLINK-7477: --- ruankd commented on issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink scripts URL: https://github.com/apache/flink/pull/4566#issuecomment-415904348 Hey, notice that this [commit](https://github.com/apache/flink/commit/0a0f6ed6c3d6cff702e4322293340274bea5e7d9) is part of this PR but it not merged into branch 1.5 and 1.6, neither in master. I wonder whether it will be merged? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use "hadoop classpath" to augment classpath when available > -- > > Key: FLINK-7477 > URL: https://issues.apache.org/jira/browse/FLINK-7477 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Fix For: 1.4.0 > > > Currently, some cloud environments don't properly put the Hadoop jars into > {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should > check in {{config.sh}} if the {{hadoop}} binary is on the path and augment > our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in > our scripts. > This will improve the out-of-box experience of users that otherwise have to > manually set {{HADOOP_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ruankd commented on issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink scripts
ruankd commented on issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink scripts URL: https://github.com/apache/flink/pull/4566#issuecomment-415904348 Hey, notice that this [commit](https://github.com/apache/flink/commit/0a0f6ed6c3d6cff702e4322293340274bea5e7d9) is part of this PR but it not merged into branch 1.5 and 1.6, neither in master. I wonder whether it will be merged? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10213) Task managers cache a negative DNS lookup of the blob server indefinitely
Joey Echeverria created FLINK-10213: --- Summary: Task managers cache a negative DNS lookup of the blob server indefinitely Key: FLINK-10213 URL: https://issues.apache.org/jira/browse/FLINK-10213 Project: Flink Issue Type: Bug Components: TaskManager Affects Versions: 1.5.0 Reporter: Joey Echeverria When the task manager establishes a connection with the resource manager, it gets the hostname and port of the blob server and uses that to create an instance of an {{InetSocketAddress}}. Per the documentation of the constructor: {quote}An attempt will be made to resolve the hostname into an InetAddress. If that attempt fails, the address will be flagged as _unresolved_{quote} Flink never checks to see if the address was unresolved. Later when executing a task that needs to download from the blob server, it will use that same {{InetSocketAddress}} instance to attempt to connect a {{Socket}}. This will result in an exception similar to: {noformat} java.io.IOException: Failed to fetch BLOB 97799b827ef073e04178a99f0f40b00e/p-6d8ec2ad31337110819c7c3641fdb18d3793a7fb-72bf00066308f4b4d2a9c5aea593b41f from jobmanager:6124 and store it under /tmp/blobStore-d135961a-03cb-4542-af6d-cf378ff83c12/incoming/temp-00018669 at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:191) ~[flink-dist_2.11-1.5.0.jar:1.5.0] at org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181) ~[flink-dist_2.11-1.5.0.jar:1.5.0] at org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206) ~[flink-dist_2.11-1.5.0.jar:1.5.0] at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) ~[flink-dist_2.11-1.5.0.jar:1.5.0] at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:863) [flink-dist_2.11-1.5.0.jar:1.5.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) [flink-dist_2.11-1.5.0.jar:1.5.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] Caused by: java.io.IOException: Could not connect to BlobServer at address flink-jobmanager:6124 at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:124) ~[flink-dist_2.11-1.5.0.jar:1.5.0] at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:165) ~[flink-dist_2.11-1.5.0.jar:1.5.0] ... 6 more Caused by: java.net.UnknownHostException: jobmanager at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) ~[?:1.8.0_171] at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_171] at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_171] at java.net.Socket.connect(Socket.java:538) ~[?:1.8.0_171] at org.apache.flink.runtime.blob.BlobClient.(BlobClient.java:118) ~[flink-dist_2.11-1.5.0.jar:1.5.0] at org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:165) ~[flink-dist_2.11-1.5.0.jar:1.5.0] ... 6 more {noformat} Since the {{InetSocketAddress}} is re-used, you'll have repeated failures of any tasks that are executed on that task manager and the only current workaround is to manually restart the task manager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592182#comment-16592182 ] Luka Jurukovski commented on FLINK-10195: - [~StephanEwen] For what I can tell no. Although this has been very much a crash course in RabbitMQ for me. Looking at forums it looks like the prefetch.count is the way that this is handled normally. Basically the consumer can tell RabbitMQ how many unacked messages to allow before stopping. Ie if the prefetch.count is set to 10,000 that is how many messages rabbitmq will allow before it needs acknowledgement, at which point it will send data until it hits this max. I would imagine that Flink would not want to use this mechanism due to the fact that it doesn't actually "backpressure" with how Checkpointing is tied to Acking. One would have to do a throughput calculation and hope that there isn't any variance in that number that results in Flink waiting on the next checkpoint. Additionally since sync checkpointing is a feature there is no guarantees that checkpointing will happen at a regular interval. Under the covers the Queueing consumer is using LinkedBlockingQueue and uses the "add" method to append to the queue. I tried changing it to use ArrayBlockingQueue with a set capacity and the blocking "put" method, however this results in another problem with RabbitMQ. Basically this results in RabbitMQ sometimes terminating the connection to Flink when Flink doesn't dequeue from the queue fast enough (noticing this usually happens only when sync checkpointing is on and it there is long running checkpoints). According to some of the forums this is due to Rabbit having some sort of timeout with regards to how long it is willing to wait when writing to a clients buffer. I have some ugly code that I am testing where I turn off the consumer when the buffer is full, and a monitoring thread that turns it back on when it is below a certain capacity. Don't know if this methodology will cause any other issues, and am testing more. I might be able to get rid of the monitoring thread but I'll look into that when I proved out this way of doing things. Welcoming any additional thoughts or comments here. Sorry for the wall of text > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: RabbitMQ Connector >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Priority: Major > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592021#comment-16592021 ] ASF GitHub Bot commented on FLINK-7243: --- HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212714129 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to Read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat extends FileInputFormat implements Review comment: Yes, it is considered. I will add new constructor with Filter for ParquetInputFormat. But I will leave the conversion logic from Flink expression provided by FilterableTableSource to Parquet FilterPredicate within Parquet table source. Thanks for the input. I will update the PR within the weekend. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212714129 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to Read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat extends FileInputFormat implements Review comment: Yes, it is considered. I will add new constructor with Filter for ParquetInputFormat. But I will leave the conversion logic from Flink expression provided by FilterableTableSource to Parquet FilterPredicate within Parquet table source. Thanks for the input. I will update the PR within the weekend. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591984#comment-16591984 ] ASF GitHub Bot commented on FLINK-7243: --- HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212706499 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; + +/** + * Schema converter converts Parquet schema to and from Flink internal types. + */ +public class ParquetSchemaConverter { + public static final String MAP_KEY = "key"; + public static final String MAP_VALUE = "value"; + public static final String LIST_ELEMENT = "array"; + public static final String MESSAGE_ROOT = "root"; + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + public static TypeInformation fromParquetType(MessageType type) { + return convertFields(type.getFields()); + } + + public static MessageType toParquetType(TypeInformation typeInformation) { + return (MessageType) convertField(null, typeInformation, Type.Repetition.OPTIONAL); + } + + private static TypeInformation convertFields(List parquetFields) { + List> types = new ArrayList<>(); + List names = new ArrayList<>(); + for (Type field : parquetFields) { + TypeInformation subType = convertField(field); + if (subType != null) { + types.add(subType); + names.add(field.getName()); + } + } + + return new RowTypeInfo(types.toArray(new TypeInformation[types.size()]), + names.toArray(new String[names.size()])); + } + + private static TypeInformation convertField(final Type fieldType) { + TypeInformation typeInfo = null; + if (fieldType.isPrimitive()) { + PrimitiveType primitiveType = fieldType.asPrimitiveType(); Review comment: This function converts a parquet primitive type to a corresponding default Flink type. The explicitly type conversion to SqlTimeTypeInfo or other types probably can handled by users when there is a need. Otherwise, we need to bring user's conversion preference in the interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-ava
[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212706499 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; + +/** + * Schema converter converts Parquet schema to and from Flink internal types. + */ +public class ParquetSchemaConverter { + public static final String MAP_KEY = "key"; + public static final String MAP_VALUE = "value"; + public static final String LIST_ELEMENT = "array"; + public static final String MESSAGE_ROOT = "root"; + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + public static TypeInformation fromParquetType(MessageType type) { + return convertFields(type.getFields()); + } + + public static MessageType toParquetType(TypeInformation typeInformation) { + return (MessageType) convertField(null, typeInformation, Type.Repetition.OPTIONAL); + } + + private static TypeInformation convertFields(List parquetFields) { + List> types = new ArrayList<>(); + List names = new ArrayList<>(); + for (Type field : parquetFields) { + TypeInformation subType = convertField(field); + if (subType != null) { + types.add(subType); + names.add(field.getName()); + } + } + + return new RowTypeInfo(types.toArray(new TypeInformation[types.size()]), + names.toArray(new String[names.size()])); + } + + private static TypeInformation convertField(final Type fieldType) { + TypeInformation typeInfo = null; + if (fieldType.isPrimitive()) { + PrimitiveType primitiveType = fieldType.asPrimitiveType(); Review comment: This function converts a parquet primitive type to a corresponding default Flink type. The explicitly type conversion to SqlTimeTypeInfo or other types probably can handled by users when there is a need. Otherwise, we need to bring user's conversion preference in the interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591977#comment-16591977 ] ASF GitHub Bot commented on FLINK-7243: --- HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212704322 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A subclass of {@link ParquetInputFormat} to read from Parquet files and convert to {@link Row}. + * It is mainly used to integrate with table API and batch SQL. + */ +public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { Review comment: Agree. Once this PR is merged, I will create another PR https://issues.apache.org/jira/browse/FLINK-7244. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
HuangZhenQiu commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212704322 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A subclass of {@link ParquetInputFormat} to read from Parquet files and convert to {@link Row}. + * It is mainly used to integrate with table API and batch SQL. + */ +public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { Review comment: Agree. Once this PR is merged, I will create another PR https://issues.apache.org/jira/browse/FLINK-7244. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10212) REST API for listing all the available save points
[ https://issues.apache.org/jira/browse/FLINK-10212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marc Rooding updated FLINK-10212: - Description: *Background* I'm one of the authors of the open-source Flink job deployer ([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our implementation to use the Flink REST API instead of the native CLI. In our use case, we store the job savepoints in a Kubernetes persistent volume. For our deployer, we mount the persistent volume to our deployer container so that we can find and use the savepoints. In the rewrite to the REST API, I saw that the API to monitor savepoint creation returns the complete path to the created savepoint, and we can use this one in the job deployer to start the new job with the latest save point. However, we also allow users to deploy a job with a recovered state by specifying only the directory savepoints are stored in. In this scenario we will look for the latest savepoint created for this job ourselves inside the given directory. To find this path, we're still relying on the mounted volume and listing directory content to discover savepoints. *Feature* I was thinking that it might be a good addition if the native Flink REST API offers the ability to retrieve savepoints. Seeing that the API doesn't inherently know where savepoints are stored, it could take a path as one of the arguments. It could even allow the user to provide a job ID as an argument so that the API would be able to search for savepoints for a specific job ID in the specified directory. As the API would require the path as an argument, and providing a path containing forward slashes in the URL isn't ideal, I'm eager to discuss what a proper solution would look like. A POST request to /jobs/:jobid/savepoints with the path as a body parameter would make sense if the API were to offer to list all save points in a specific path but this request is already being used for creating new savepoints. An alternative could be a POST to /savepoints with the path and job ID in the request body. A POST request to retrieve data is obviously not the most straightforward approach but in my opinion still preferable over a GET to, for example, /jobs/:jobid/savepoints/:targetDirectory I'm willing to help out on this one by submitting a pull request. Looking forward to your thoughts! was: *Background* I'm one of the authors of the open-source Flink job deployer ([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our implementation to use the Flink REST API instead of the native CLI. In our use case, we store the job savepoints in a Kubernetes persistent volume. For our deployer, we mount the persistent volume to our deployer container so that we can find and use the savepoints. In the rewrite to the REST API, I saw that the API to monitor savepoint creation returns the complete path to the created savepoint, and we can use this one in the job deployer to start the new job with the latest save point. However, we also allow users to deploy a job with a recovered state by specifying only the directory savepoints are stored in. In this scenario we will look for the latest savepoint created for this job ourselves inside the given directory. To find this path, we're still relying on the mounted volume and listing directory content to discover savepoints. *Feature* I was thinking that it might be a good addition if the native Flink REST API offers the ability to retrieve savepoints. Seeing that the API doesn't inherently know where savepoints are stored, it could take a path as one of the arguments. It could even allow the user to provide a job ID as an argument so that the API would be able to search for savepoints for a specific job ID in the specified directory. As the API would require the path as an argument, and providing a path containing forward slashes in the URL isn't ideal, I'm eager to discuss what a proper solution would look like. A POST request to /jobs/:jobid/savepoints with the path as a body parameter would make sense if the API were to offer to list all save points in a specific path but this request is already being used for creating new savepoints. An alternative could be a POST to /savepoints with the path and job ID in the request body. A POST request to retrieve data is obviously not the most straightforward approach but in my opinion still preferable over a GET to, for example, /jobs/:jobid/savepoints/:targetDirectory I'm willing to help out on this one by submitting a pull request. Looking forward to your thoughts! > REST API for listing all the available save points > -- > > Key: FLINK-10212 > URL: https://issues.apache.org/jira/browse/FLINK-10212 > Project: Flink > Issue Type: New Feature >
[jira] [Created] (FLINK-10212) REST API for listing all the available save points
Marc Rooding created FLINK-10212: Summary: REST API for listing all the available save points Key: FLINK-10212 URL: https://issues.apache.org/jira/browse/FLINK-10212 Project: Flink Issue Type: New Feature Reporter: Marc Rooding *Background* I'm one of the authors of the open-source Flink job deployer ([https://github.com/ing-bank/flink-deployer)]. Recently, I rewrote our implementation to use the Flink REST API instead of the native CLI. In our use case, we store the job savepoints in a Kubernetes persistent volume. For our deployer, we mount the persistent volume to our deployer container so that we can find and use the savepoints. In the rewrite to the REST API, I saw that the API to monitor savepoint creation returns the complete path to the created savepoint, and we can use this one in the job deployer to start the new job with the latest save point. However, we also allow users to deploy a job with a recovered state by specifying only the directory savepoints are stored in. In this scenario we will look for the latest savepoint created for this job ourselves inside the given directory. To find this path, we're still relying on the mounted volume and listing directory content to discover savepoints. *Feature* I was thinking that it might be a good addition if the native Flink REST API offers the ability to retrieve savepoints. Seeing that the API doesn't inherently know where savepoints are stored, it could take a path as one of the arguments. It could even allow the user to provide a job ID as an argument so that the API would be able to search for savepoints for a specific job ID in the specified directory. As the API would require the path as an argument, and providing a path containing forward slashes in the URL isn't ideal, I'm eager to discuss what a proper solution would look like. A POST request to /jobs/:jobid/savepoints with the path as a body parameter would make sense if the API were to offer to list all save points in a specific path but this request is already being used for creating new savepoints. An alternative could be a POST to /savepoints with the path and job ID in the request body. A POST request to retrieve data is obviously not the most straightforward approach but in my opinion still preferable over a GET to, for example, /jobs/:jobid/savepoints/:targetDirectory I'm willing to help out on this one by submitting a pull request. Looking forward to your thoughts! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10153) Add tutorial section to documentation
[ https://issues.apache.org/jira/browse/FLINK-10153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591943#comment-16591943 ] ASF GitHub Bot commented on FLINK-10153: asfgit closed pull request #6565: [FLINK-10153] [docs] Add Tutorials section and rework structure. URL: https://github.com/apache/flink/pull/6565 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md index c4215074683..bd7ca5aff90 100644 --- a/docs/dev/api_concepts.md +++ b/docs/dev/api_concepts.md @@ -510,7 +510,7 @@ data.map(new MapFunction () { Java 8 Lambdas -Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 8 Guide]({{ site.baseurl }}/dev/java8.html). +Flink also supports Java 8 Lambdas in the Java API. {% highlight java %} data.filter(s -> s.startsWith("http://";)); diff --git a/docs/dev/batch/examples.md b/docs/dev/batch/examples.md index 90e372dfd60..fe2bd8d3bdc 100644 --- a/docs/dev/batch/examples.md +++ b/docs/dev/batch/examples.md @@ -27,8 +27,7 @@ The following example programs showcase different applications of Flink from simple word counting to graph algorithms. The code samples illustrate the use of [Flink's DataSet API]({{ site.baseurl }}/dev/batch/index.html). -The full source code of the following and more examples can be found in the __flink-examples-batch__ -or __flink-examples-streaming__ module of the Flink source repository. +The full source code of the following and more examples can be found in the {% gh_link flink-examples/flink-examples-batch "flink-examples-batch" %} module of the Flink source repository. * This will be replaced by the TOC {:toc} @@ -420,102 +419,4 @@ Input files are plain text files and must be formatted as follows: - Edges are represented as pairs for vertex IDs which are separated by space characters. Edges are separated by new-line characters: * For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63). -## Relational Query - -The Relational Query example assumes two tables, one with `orders` and the other with `lineitems` as specified by the [TPC-H decision support benchmark](http://www.tpc.org/tpch/). TPC-H is a standard benchmark in the database industry. See below for instructions how to generate the input data. - -The example implements the following SQL query. - -{% highlight sql %} -SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue -FROM orders, lineitem -WHERE l_orderkey = o_orderkey -AND o_orderstatus = "F" -AND YEAR(o_orderdate) > 1993 -AND o_orderpriority LIKE "5%" -GROUP BY l_orderkey, o_shippriority; -{% endhighlight %} - -The Flink program, which implements the above query looks as follows. - - - - -{% highlight java %} -// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority) -DataSet> orders = getOrdersDataSet(env); -// get lineitem data set: (orderkey, extendedprice) -DataSet> lineitems = getLineitemDataSet(env); - -// orders filtered by year: (orderkey, custkey) -DataSet> ordersFilteredByYear = -// filter orders -orders.filter( -new FilterFunction>() { -@Override -public boolean filter(Tuple5 t) { -// status filter -if(!t.f1.equals(STATUS_FILTER)) { -return false; -// year filter -} else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) { -return false; -// order priority filter -} else if(!t.f3.startsWith(OPRIO_FILTER)) { -return false; -} -return true; -} -}) -// project fields out that are no longer required -.project(0,4).types(Integer.class, Integer.class); - -// join orders with lineitems: (orderkey, shippriority, extendedprice) -DataSet> lineitemsOfOrders = -ordersFilteredByYear.joinWithHuge(lineitems) -.where(0).equalTo(0) -.projectFirst(0,1).projectSecond(1) -.types(Integer.class, Integer.class, Double.class); - -// extendedprice sums: (orderkey, shippriority, sum(extendedprice)) -DataSet> priceSums = -// group by order and sum extendedprice -lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2); - -// emit result -priceSums.writeAsCsv(outputPath); -{% endhighlight %} - -The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/
[jira] [Closed] (FLINK-10153) Add tutorial section to documentation
[ https://issues.apache.org/jira/browse/FLINK-10153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-10153. - Resolution: Done Fix Version/s: 1.7.0 Done for 1.7.0 with 52cbe07ba7a367880475af59596adc2365bd8a21 > Add tutorial section to documentation > - > > Key: FLINK-10153 > URL: https://issues.apache.org/jira/browse/FLINK-10153 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > The current documentation does not feature a dedicated tutorials section and > has a few issues that should be fix in order to help our (future) users > getting started with Flink. > I propose to add a single "Tutorials" section to the documentation where > users find step-by-step guides. The tutorials section help users with > different goals: > * Get a quick idea of the overall system > * Implement a DataStream/DataSet/Table API/SQL job > * Set up Flink on a local machine (or run a Docker container) > There are already a few guides to get started but they are located at > different places and should be moved into the Tutorials section. Moreover, > some sections such as "Project Setup" contain content that addresses users > with very different intentions. > I propose to > * add a new Tutorials section and move all existing tutorials there (and > later add new ones). > * move the "Quickstart" section to "Tutorials". > * remove the "Project Setup" section and move the pages to other sections > (some pages will be split up or adjusted). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6565: [FLINK-10153] [docs] Add Tutorials section and rework structure.
asfgit closed pull request #6565: [FLINK-10153] [docs] Add Tutorials section and rework structure. URL: https://github.com/apache/flink/pull/6565 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md index c4215074683..bd7ca5aff90 100644 --- a/docs/dev/api_concepts.md +++ b/docs/dev/api_concepts.md @@ -510,7 +510,7 @@ data.map(new MapFunction () { Java 8 Lambdas -Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 8 Guide]({{ site.baseurl }}/dev/java8.html). +Flink also supports Java 8 Lambdas in the Java API. {% highlight java %} data.filter(s -> s.startsWith("http://";)); diff --git a/docs/dev/batch/examples.md b/docs/dev/batch/examples.md index 90e372dfd60..fe2bd8d3bdc 100644 --- a/docs/dev/batch/examples.md +++ b/docs/dev/batch/examples.md @@ -27,8 +27,7 @@ The following example programs showcase different applications of Flink from simple word counting to graph algorithms. The code samples illustrate the use of [Flink's DataSet API]({{ site.baseurl }}/dev/batch/index.html). -The full source code of the following and more examples can be found in the __flink-examples-batch__ -or __flink-examples-streaming__ module of the Flink source repository. +The full source code of the following and more examples can be found in the {% gh_link flink-examples/flink-examples-batch "flink-examples-batch" %} module of the Flink source repository. * This will be replaced by the TOC {:toc} @@ -420,102 +419,4 @@ Input files are plain text files and must be formatted as follows: - Edges are represented as pairs for vertex IDs which are separated by space characters. Edges are separated by new-line characters: * For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63). -## Relational Query - -The Relational Query example assumes two tables, one with `orders` and the other with `lineitems` as specified by the [TPC-H decision support benchmark](http://www.tpc.org/tpch/). TPC-H is a standard benchmark in the database industry. See below for instructions how to generate the input data. - -The example implements the following SQL query. - -{% highlight sql %} -SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue -FROM orders, lineitem -WHERE l_orderkey = o_orderkey -AND o_orderstatus = "F" -AND YEAR(o_orderdate) > 1993 -AND o_orderpriority LIKE "5%" -GROUP BY l_orderkey, o_shippriority; -{% endhighlight %} - -The Flink program, which implements the above query looks as follows. - - - - -{% highlight java %} -// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority) -DataSet> orders = getOrdersDataSet(env); -// get lineitem data set: (orderkey, extendedprice) -DataSet> lineitems = getLineitemDataSet(env); - -// orders filtered by year: (orderkey, custkey) -DataSet> ordersFilteredByYear = -// filter orders -orders.filter( -new FilterFunction>() { -@Override -public boolean filter(Tuple5 t) { -// status filter -if(!t.f1.equals(STATUS_FILTER)) { -return false; -// year filter -} else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) { -return false; -// order priority filter -} else if(!t.f3.startsWith(OPRIO_FILTER)) { -return false; -} -return true; -} -}) -// project fields out that are no longer required -.project(0,4).types(Integer.class, Integer.class); - -// join orders with lineitems: (orderkey, shippriority, extendedprice) -DataSet> lineitemsOfOrders = -ordersFilteredByYear.joinWithHuge(lineitems) -.where(0).equalTo(0) -.projectFirst(0,1).projectSecond(1) -.types(Integer.class, Integer.class, Double.class); - -// extendedprice sums: (orderkey, shippriority, sum(extendedprice)) -DataSet> priceSums = -// group by order and sum extendedprice -lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2); - -// emit result -priceSums.writeAsCsv(outputPath); -{% endhighlight %} - -The {% gh_link /flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java "Relational Query program" %} implements the above query. It requires the following parameters to run: `--orders --lineitem --output `. - - - -Coming soon... - -The {% gh_link /flink-examples/flink-examples-batch
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591933#comment-16591933 ] ASF GitHub Bot commented on FLINK-10163: fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212693978 ## File path: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ## @@ -390,51 +395,101 @@ private void callSelect(SqlCommandCall cmdCall) { view.open(); // view left - terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_RESULT_QUIT).toAnsi()); - terminal.flush(); + printInfo(CliStrings.MESSAGE_RESULT_QUIT); } catch (SqlExecutionException e) { - printException(e); + printExecutionException(e); } } private boolean callInsertInto(SqlCommandCall cmdCall) { - terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi()); - terminal.flush(); + printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT); try { final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]); terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi()); terminal.writer().println(programTarget.toString()); terminal.flush(); } catch (SqlExecutionException e) { - printException(e); + printExecutionException(e); return false; } return true; } - private void callSource(SqlCommandCall cmdCall) { - final String pathString = cmdCall.operands[0]; + private void callCreateView(SqlCommandCall cmdCall) { + final String name = cmdCall.operands[0]; + final String query = cmdCall.operands[1]; + + try { + // validate with a copy + final SessionContext contextCopy = context.copy(); Review comment: This check fails if the VIEW query contains UDFs. It seems that these are not registered yet. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212693978 ## File path: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java ## @@ -390,51 +395,101 @@ private void callSelect(SqlCommandCall cmdCall) { view.open(); // view left - terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_RESULT_QUIT).toAnsi()); - terminal.flush(); + printInfo(CliStrings.MESSAGE_RESULT_QUIT); } catch (SqlExecutionException e) { - printException(e); + printExecutionException(e); } } private boolean callInsertInto(SqlCommandCall cmdCall) { - terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT).toAnsi()); - terminal.flush(); + printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT); try { final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]); terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi()); terminal.writer().println(programTarget.toString()); terminal.flush(); } catch (SqlExecutionException e) { - printException(e); + printExecutionException(e); return false; } return true; } - private void callSource(SqlCommandCall cmdCall) { - final String pathString = cmdCall.operands[0]; + private void callCreateView(SqlCommandCall cmdCall) { + final String name = cmdCall.operands[0]; + final String query = cmdCall.operands[1]; + + try { + // validate with a copy + final SessionContext contextCopy = context.copy(); Review comment: This check fails if the VIEW query contains UDFs. It seems that these are not registered yet. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212621362 ## File path: docs/dev/table/sqlClient.md ## @@ -459,6 +466,50 @@ Web interface: http://localhost:8081 {% top %} +SQL Views +- + +Views allow to define virtual tables from SQL queries. The view definition is parsed and validated immediately. However, the actual execution happens when the view is accessed during the submission of a general `INSERT INTO` or `SELECT` statement. + +Views can either be defined in [environment files](sqlClient.html#environment-files) or within the CLI session. + +The following example shows how to define multiple views in a file: + +{% highlight yaml %} +views: + - name: MyRestrictedView +query: "SELECT MyField2 FROM MyTableSource" + - name: MyComplexView +query: > + SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR) + FROM MyTableSource + WHERE MyField2 > 200 +{% endhighlight %} + +Similar to table sources and sinks, views defined in a session environment file have highest precendence. + +Views can also be created within a CLI session using the `CREATE VIEW` statement: + +{% highlight text %} +CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource +{% endhighlight %} + +The `SHOW VIEW` statement allows for printing a previously created view again: Review comment: I think the way that these commands work is a bit inconsistent with the `SHOW TABLES`, `SHOW FUNCTIONS`, and `DESCRIBE xxx` commands. 1. Views are listed as regular tables when calling `SHOW TABLES`. * List tables and views together, but mark views as views * Or add a `SHOW VIEWS` to only list views (and only list tables with `LIST TABLES`) 2. The schema of a view is returned with `DESCRIBE xxx`. This is fine, but IMO, we should also return the query. Showing schema and query with different commands is not intuitive to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591932#comment-16591932 ] ASF GitHub Bot commented on FLINK-10163: fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212609696 ## File path: docs/dev/table/sqlClient.md ## @@ -459,6 +466,50 @@ Web interface: http://localhost:8081 {% top %} +SQL Views +- + +Views allow to define virtual tables from SQL queries. The view definition is parsed and validated immediately. However, the actual execution happens when the view is accessed during the submission of a general `INSERT INTO` or `SELECT` statement. + +Views can either be defined in [environment files](sqlClient.html#environment-files) or within the CLI session. + +The following example shows how to define multiple views in a file: + +{% highlight yaml %} +views: + - name: MyRestrictedView +query: "SELECT MyField2 FROM MyTableSource" + - name: MyComplexView +query: > + SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR) + FROM MyTableSource + WHERE MyField2 > 200 +{% endhighlight %} + +Similar to table sources and sinks, views defined in a session environment file have highest precendence. + +Views can also be created within a CLI session using the `CREATE VIEW` statement: + +{% highlight text %} +CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource +{% endhighlight %} + +The `SHOW VIEW` statement allows for printing a previously created view again: + +{% highlight text %} +SHOW VIEW MyNewView +{% endhighlight %} + +Views created within a CLI session can also be removed again using the `DROP VIEW` statement: + +{% highlight text %} +DROP VIEW MyNewView +{% endhighlight %} + +Attention The definition of views is limited to the mentioned semantics above. Defining a schema for views or escape whitespaces in table names will be supported in future versions. Review comment: semantics -> syntax This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591934#comment-16591934 ] ASF GitHub Bot commented on FLINK-10163: fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212621362 ## File path: docs/dev/table/sqlClient.md ## @@ -459,6 +466,50 @@ Web interface: http://localhost:8081 {% top %} +SQL Views +- + +Views allow to define virtual tables from SQL queries. The view definition is parsed and validated immediately. However, the actual execution happens when the view is accessed during the submission of a general `INSERT INTO` or `SELECT` statement. + +Views can either be defined in [environment files](sqlClient.html#environment-files) or within the CLI session. + +The following example shows how to define multiple views in a file: + +{% highlight yaml %} +views: + - name: MyRestrictedView +query: "SELECT MyField2 FROM MyTableSource" + - name: MyComplexView +query: > + SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR) + FROM MyTableSource + WHERE MyField2 > 200 +{% endhighlight %} + +Similar to table sources and sinks, views defined in a session environment file have highest precendence. + +Views can also be created within a CLI session using the `CREATE VIEW` statement: + +{% highlight text %} +CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource +{% endhighlight %} + +The `SHOW VIEW` statement allows for printing a previously created view again: Review comment: I think the way that these commands work is a bit inconsistent with the `SHOW TABLES`, `SHOW FUNCTIONS`, and `DESCRIBE xxx` commands. 1. Views are listed as regular tables when calling `SHOW TABLES`. * List tables and views together, but mark views as views * Or add a `SHOW VIEWS` to only list views (and only list tables with `LIST TABLES`) 2. The schema of a view is returned with `DESCRIBE xxx`. This is fine, but IMO, we should also return the query. Showing schema and query with different commands is not intuitive to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591930#comment-16591930 ] ASF GitHub Bot commented on FLINK-10163: fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212654542 ## File path: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ## @@ -67,24 +75,36 @@ public Environment() { public void setTables(List> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TABLE_NAME)) { - throw new SqlClientException("The 'name' attribute of a table is missing."); - } - final Object nameObject = config.get(TABLE_NAME); - if (nameObject == null || !(nameObject instanceof String) || ((String) nameObject).length() <= 0) { - throw new SqlClientException("Invalid table name '" + nameObject + "'."); - } - final String name = (String) nameObject; + final String name = extractEarlyStringProperty(config, TABLE_NAME, "table"); final Map properties = new HashMap<>(config); properties.remove(TABLE_NAME); - if (this.tables.containsKey(name)) { + if (this.tables.containsKey(name) || this.views.containsKey(name)) { throw new SqlClientException("Duplicate table name '" + name + "'."); } this.tables.put(name, createTableDescriptor(name, properties)); }); } + public Map getViews() { + return views; + } + + public void setViews(List> views) { + // the order of how views are registered matters because + // they might reference each other + this.views = new LinkedHashMap<>(views.size()); + views.forEach(config -> { + final String name = extractEarlyStringProperty(config, VIEW_NAME, "view"); + final String query = extractEarlyStringProperty(config, VIEW_QUERY, "view"); + + if (this.tables.containsKey(name) || this.views.containsKey(name)) { + throw new SqlClientException("Duplicate table name '" + name + "'."); Review comment: `table name` -> `view name` (or rewrite error message to sth like "Cannot create view XXX because another table or view with that name is already registered." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212609696 ## File path: docs/dev/table/sqlClient.md ## @@ -459,6 +466,50 @@ Web interface: http://localhost:8081 {% top %} +SQL Views +- + +Views allow to define virtual tables from SQL queries. The view definition is parsed and validated immediately. However, the actual execution happens when the view is accessed during the submission of a general `INSERT INTO` or `SELECT` statement. + +Views can either be defined in [environment files](sqlClient.html#environment-files) or within the CLI session. + +The following example shows how to define multiple views in a file: + +{% highlight yaml %} +views: + - name: MyRestrictedView +query: "SELECT MyField2 FROM MyTableSource" + - name: MyComplexView +query: > + SELECT MyField2 + 42, CAST(MyField1 AS VARCHAR) + FROM MyTableSource + WHERE MyField2 > 200 +{% endhighlight %} + +Similar to table sources and sinks, views defined in a session environment file have highest precendence. + +Views can also be created within a CLI session using the `CREATE VIEW` statement: + +{% highlight text %} +CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource +{% endhighlight %} + +The `SHOW VIEW` statement allows for printing a previously created view again: + +{% highlight text %} +SHOW VIEW MyNewView +{% endhighlight %} + +Views created within a CLI session can also be removed again using the `DROP VIEW` statement: + +{% highlight text %} +DROP VIEW MyNewView +{% endhighlight %} + +Attention The definition of views is limited to the mentioned semantics above. Defining a schema for views or escape whitespaces in table names will be supported in future versions. Review comment: semantics -> syntax This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10163) Support CREATE VIEW in SQL Client
[ https://issues.apache.org/jira/browse/FLINK-10163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591931#comment-16591931 ] ASF GitHub Bot commented on FLINK-10163: fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212622912 ## File path: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java ## @@ -30,86 +35,124 @@ private SqlCommandParser() { } public static Optional parse(String stmt) { - String trimmed = stmt.trim(); + // normalize + stmt = stmt.trim(); // remove ';' at the end because many people type it intuitively - if (trimmed.endsWith(";")) { - trimmed = trimmed.substring(0, trimmed.length() - 1); + if (stmt.endsWith(";")) { + stmt = stmt.substring(0, stmt.length() - 1).trim(); } + + // parse for (SqlCommand cmd : SqlCommand.values()) { - int pos = 0; - int tokenCount = 0; - for (String token : trimmed.split("\\s")) { - pos += token.length() + 1; // include space character - // check for content - if (token.length() > 0) { - // match - if (tokenCount < cmd.tokens.length && token.equalsIgnoreCase(cmd.tokens[tokenCount])) { - if (tokenCount == cmd.tokens.length - 1) { - final SqlCommandCall call = new SqlCommandCall( - cmd, - splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length( - ); - return Optional.of(call); - } - } else { - // next sql command - break; - } - tokenCount++; // check next token + final Pattern pattern = Pattern.compile(cmd.matchingRegex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL); Review comment: Pattern compilation is quite heavy. Can we do this just once and not for every entered command? Maybe store the compiled pattern in the enum? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support CREATE VIEW in SQL Client > - > > Key: FLINK-10163 > URL: https://issues.apache.org/jira/browse/FLINK-10163 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > The possibility to define a name for a subquery would improve the usability > of the SQL Client. The SQL standard defines \{{CREATE VIEW}} for defining a > virtual table. > > Example: > {code} > CREATE VIEW viewName > [ '(' columnName [, columnName ]* ')' ] > AS Query > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212622912 ## File path: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java ## @@ -30,86 +35,124 @@ private SqlCommandParser() { } public static Optional parse(String stmt) { - String trimmed = stmt.trim(); + // normalize + stmt = stmt.trim(); // remove ';' at the end because many people type it intuitively - if (trimmed.endsWith(";")) { - trimmed = trimmed.substring(0, trimmed.length() - 1); + if (stmt.endsWith(";")) { + stmt = stmt.substring(0, stmt.length() - 1).trim(); } + + // parse for (SqlCommand cmd : SqlCommand.values()) { - int pos = 0; - int tokenCount = 0; - for (String token : trimmed.split("\\s")) { - pos += token.length() + 1; // include space character - // check for content - if (token.length() > 0) { - // match - if (tokenCount < cmd.tokens.length && token.equalsIgnoreCase(cmd.tokens[tokenCount])) { - if (tokenCount == cmd.tokens.length - 1) { - final SqlCommandCall call = new SqlCommandCall( - cmd, - splitOperands(cmd, trimmed, trimmed.substring(Math.min(pos, trimmed.length( - ); - return Optional.of(call); - } - } else { - // next sql command - break; - } - tokenCount++; // check next token + final Pattern pattern = Pattern.compile(cmd.matchingRegex, Pattern.CASE_INSENSITIVE | Pattern.DOTALL); Review comment: Pattern compilation is quite heavy. Can we do this just once and not for every entered command? Maybe store the compiled pattern in the enum? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client
fhueske commented on a change in pull request #6606: [FLINK-10163] [sql-client] Support views in SQL Client URL: https://github.com/apache/flink/pull/6606#discussion_r212654542 ## File path: flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java ## @@ -67,24 +75,36 @@ public Environment() { public void setTables(List> tables) { this.tables = new HashMap<>(tables.size()); tables.forEach(config -> { - if (!config.containsKey(TABLE_NAME)) { - throw new SqlClientException("The 'name' attribute of a table is missing."); - } - final Object nameObject = config.get(TABLE_NAME); - if (nameObject == null || !(nameObject instanceof String) || ((String) nameObject).length() <= 0) { - throw new SqlClientException("Invalid table name '" + nameObject + "'."); - } - final String name = (String) nameObject; + final String name = extractEarlyStringProperty(config, TABLE_NAME, "table"); final Map properties = new HashMap<>(config); properties.remove(TABLE_NAME); - if (this.tables.containsKey(name)) { + if (this.tables.containsKey(name) || this.views.containsKey(name)) { throw new SqlClientException("Duplicate table name '" + name + "'."); } this.tables.put(name, createTableDescriptor(name, properties)); }); } + public Map getViews() { + return views; + } + + public void setViews(List> views) { + // the order of how views are registered matters because + // they might reference each other + this.views = new LinkedHashMap<>(views.size()); + views.forEach(config -> { + final String name = extractEarlyStringProperty(config, VIEW_NAME, "view"); + final String query = extractEarlyStringProperty(config, VIEW_QUERY, "view"); + + if (this.tables.containsKey(name) || this.views.containsKey(name)) { + throw new SqlClientException("Duplicate table name '" + name + "'."); Review comment: `table name` -> `view name` (or rewrite error message to sth like "Cannot create view XXX because another table or view with that name is already registered." This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591852#comment-16591852 ] ASF GitHub Bot commented on FLINK-10204: benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#issuecomment-415805492 @zentol do you want me to squash the commits and force push to this branch so that the history remains clean? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#issuecomment-415805492 @zentol do you want me to squash the commits and force push to this branch so that the history remains clean? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591804#comment-16591804 ] ASF GitHub Bot commented on FLINK-10204: benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212666184 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { Review comment: Sure, I will do that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212666184 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { Review comment: Sure, I will do that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591783#comment-16591783 ] ASF GitHub Bot commented on FLINK-9559: --- pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415789873 @hequn8128 I'm pretty sure that what you have just pointed out is bug in Flink SQL. I'm not 100% sure (maybe 95%), but when comparing different types like `CHAR(3)` with `CHAR(5)` or `VARCHAR(2)`, all of the operands should be first converted to same type (adding/ignoring padding when necessary) and only then compared. In other words `'A ' == 'A'` should return true, and this PR is not a proper fix to this problem. It doesn't fix for example: ``` SELECT * FROM country WHERE country_name = 'GERMANY ' ``` This further convinces me that: > and our CHAR(x) support seems to be broken (for example 'A ' || 'B ' should return 'A B ') Probably either we should convert string literals to `VARCHAR` or properly implement support for `CHAR` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415789873 @hequn8128 I'm pretty sure that what you have just pointed out is bug in Flink SQL. I'm not 100% sure (maybe 95%), but when comparing different types like `CHAR(3)` with `CHAR(5)` or `VARCHAR(2)`, all of the operands should be first converted to same type (adding/ignoring padding when necessary) and only then compared. In other words `'A ' == 'A'` should return true, and this PR is not a proper fix to this problem. It doesn't fix for example: ``` SELECT * FROM country WHERE country_name = 'GERMANY ' ``` This further convinces me that: > and our CHAR(x) support seems to be broken (for example 'A ' || 'B ' should return 'A B ') Probably either we should convert string literals to `VARCHAR` or properly implement support for `CHAR` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9391) Support UNNEST in Table API
[ https://issues.apache.org/jira/browse/FLINK-9391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591754#comment-16591754 ] Hequn Cheng commented on FLINK-9391: Hi [~SokolovMS], Thanks for contributing to Flink. You are right, UNNEST is built on top of the user-define table function which is different from other scalar functions. Tests about table functions can be found in \{{CorrelateITCase}}, you can have a research on the implementation. If you have any questions feel free to ask. > Support UNNEST in Table API > --- > > Key: FLINK-9391 > URL: https://issues.apache.org/jira/browse/FLINK-9391 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alina Ipatina >Priority: Major > > FLINK-6033 introduced the UNNEST function for SQL. We should also add this > function to the Table API to keep the APIs in sync. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591737#comment-16591737 ] ASF GitHub Bot commented on FLINK-10204: zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212644961 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { Review comment: let's reduce this change to only replacing `operatorId != that.operatorId`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591736#comment-16591736 ] ASF GitHub Bot commented on FLINK-10204: zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212645193 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { + LatencyMarker other = (LatencyMarker) obj; + if (markedTime != other.markedTime) { return false; } - return subtaskIndex == that.subtaskIndex; - + if (operatorId == null) { + if (other.operatorId != null) { + return false; + } + } else if (!operatorId.equals(other.operatorId)) { + return false; + } + if (subtaskIndex != other.subtaskIndex) { + return false; + } + return true; } @Override public int hashCode() { - int result = (int) (markedTime ^ (markedTime >>> 32)); Review comment: This change isn't necessary, please revert. While it is theoretically possible for operatorId to be null, in practices this won't happen. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that g
[GitHub] zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212644961 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { Review comment: let's reduce this change to only replacing `operatorId != that.operatorId`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
zentol commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212645193 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { + LatencyMarker other = (LatencyMarker) obj; + if (markedTime != other.markedTime) { return false; } - return subtaskIndex == that.subtaskIndex; - + if (operatorId == null) { + if (other.operatorId != null) { + return false; + } + } else if (!operatorId.equals(other.operatorId)) { + return false; + } + if (subtaskIndex != other.subtaskIndex) { + return false; + } + return true; } @Override public int hashCode() { - int result = (int) (markedTime ^ (markedTime >>> 32)); Review comment: This change isn't necessary, please revert. While it is theoretically possible for operatorId to be null, in practices this won't happen. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#issuecomment-415769746 @StephanEwen and @zentol , I've commented on the PR comments. The reason why LatencyMarker has changes is because the equals implementation was incorrect. It did an operatorId != that.operatorId, instead of !operatorId.equals(that.operatorId). If you want, I can revert that change and just write a more complicated assertEquals statement in the test, but I feel that it is the correct change. The other stream records have equals correctly implemented so that they can compare after a copy. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591707#comment-16591707 ] ASF GitHub Bot commented on FLINK-10204: benlamonica commented on issue #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#issuecomment-415769746 @StephanEwen and @zentol , I've commented on the PR comments. The reason why LatencyMarker has changes is because the equals implementation was incorrect. It did an operatorId != that.operatorId, instead of !operatorId.equals(that.operatorId). If you want, I can revert that change and just write a more complicated assertEquals statement in the test, but I feel that it is the correct change. The other stream records have equals correctly implemented so that they can compare after a copy. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591702#comment-16591702 ] ASF GitHub Bot commented on FLINK-10204: benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212639070 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java ## @@ -89,6 +90,9 @@ public void testSerialization() throws Exception { Watermark negativeWatermark = new Watermark(-4647654567676555876L); assertEquals(negativeWatermark, serializeAndDeserialize(negativeWatermark, serializer)); + + LatencyMarker latencyMarker = new LatencyMarker(System.currentTimeMillis(), new OperatorID(-1, -1), 1); + assertEquals(latencyMarker, serializeAndDeserialize(latencyMarker, serializer)); Review comment: hashCode and equals changes are to support this line here. Without it, even though the objects have the same values, they still evaluate as not equal. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212639070 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java ## @@ -89,6 +90,9 @@ public void testSerialization() throws Exception { Watermark negativeWatermark = new Watermark(-4647654567676555876L); assertEquals(negativeWatermark, serializeAndDeserialize(negativeWatermark, serializer)); + + LatencyMarker latencyMarker = new LatencyMarker(System.currentTimeMillis(), new OperatorID(-1, -1), 1); + assertEquals(latencyMarker, serializeAndDeserialize(latencyMarker, serializer)); Review comment: hashCode and equals changes are to support this line here. Without it, even though the objects have the same values, they still evaluate as not equal. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591641#comment-16591641 ] ASF GitHub Bot commented on FLINK-10204: benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212626021 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { + LatencyMarker other = (LatencyMarker) obj; + if (markedTime != other.markedTime) { return false; } - return subtaskIndex == that.subtaskIndex; - + if (operatorId == null) { + if (other.operatorId != null) { + return false; + } + } else if (!operatorId.equals(other.operatorId)) { + return false; + } + if (subtaskIndex != other.subtaskIndex) { + return false; + } + return true; } @Override public int hashCode() { - int result = (int) (markedTime ^ (markedTime >>> 32)); Review comment: This is just the way eclipse generates the hashCode. Since operatorId can be null, it does a nullcheck before calling hashCode on it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes al
[GitHub] benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212626021 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { + if (this == obj) { return true; } - if (o == null || getClass() != o.getClass()){ + if (obj == null) { return false; } - - LatencyMarker that = (LatencyMarker) o; - - if (markedTime != that.markedTime) { + if (getClass() != obj.getClass()) { return false; } - if (operatorId != that.operatorId) { + LatencyMarker other = (LatencyMarker) obj; + if (markedTime != other.markedTime) { return false; } - return subtaskIndex == that.subtaskIndex; - + if (operatorId == null) { + if (other.operatorId != null) { + return false; + } + } else if (!operatorId.equals(other.operatorId)) { + return false; + } + if (subtaskIndex != other.subtaskIndex) { + return false; + } + return true; } @Override public int hashCode() { - int result = (int) (markedTime ^ (markedTime >>> 32)); Review comment: This is just the way eclipse generates the hashCode. Since operatorId can be null, it does a nullcheck before calling hashCode on it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase
[ https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591639#comment-16591639 ] ASF GitHub Bot commented on FLINK-9713: --- pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r212625803 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/VersionedJoin.scala ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +class VersionedJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + protected var rightState: ValueState[Row] = _ + protected var cRowWrapper: CRowWrappingCollector = _ + + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) + +joinFunction = clazz.newInstance() + +val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType) Review comment: Could we do it as a follow up task to minimize PR size? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support versioned joins in planning phase > - > > Key: FLINK-9713 > URL: https://issues.apache.org/jira/browse/FLINK-9713 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Queries like: > {code:java} > SELECT > o.amount * r.rate > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE o.currency = r.currency{code} > should evaluate to valid plan with versioned joins plan node. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase
[ https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591638#comment-16591638 ] ASF GitHub Bot commented on FLINK-9713: --- pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208833159 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlCastFunction +import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.functions.TableVersionFunction +import org.apache.flink.table.functions.sql.ProctimeSqlFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin +import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.Preconditions.checkState + +class LogicalCorrelateToVersionedJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToVersionedJoinRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { +val logicalCorrelate: LogicalCorrelate = call.rel(0) +val leftNode: RelNode = call.rel(1) +val rightTableFunctionScan: TableFunctionScan = call.rel(2) + +val cluster = logicalCorrelate.getCluster + +new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode) + .visit(rightTableFunctionScan.getCall) match { + case Some(rightVersionedTableCall) => +val underlyingTableHistory: Table = rightVersionedTableCall.tableVersionFunction.table +val relBuilder = this.relBuilderFactory.create( + cluster, + underlyingTableHistory.relBuilder.getRelOptSchema) +val rexBuilder = cluster.getRexBuilder + +val rightNode: RelNode = underlyingTableHistory.logicalPlan.toRelNode(relBuilder) + +val rightVersionExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.versionField) + +val rightPrimaryKeyExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.primaryKey) + +relBuilder.push( + if (rightVersionedTableCall.tableVersionFunction.isProctime) { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightPrimaryKeyExpression) + } + else { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightVersionExpression, + rightPrimaryKeyExpression) + }) +call.transformTo(relBuilder.build()) + case None => +} + } + + private def createRightExpression( + rexBuilder: RexBuilder, + leftNode: RelNode, + rightNode: RelNode, + field: String): RexNode = { +val rightReference
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r212625803 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/VersionedJoin.scala ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.join + +import org.apache.flink.api.common.functions.FlatJoinFunction +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.CRowWrappingCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +class VersionedJoin( +leftType: TypeInformation[Row], +rightType: TypeInformation[Row], +genJoinFuncName: String, +genJoinFuncCode: String, +queryConfig: StreamQueryConfig) + extends CoProcessFunction[CRow, CRow, CRow] + with Compiler[FlatJoinFunction[Row, Row, Row]] + with Logging { + + validateEqualsHashCode("join", leftType) + validateEqualsHashCode("join", rightType) + + protected var rightState: ValueState[Row] = _ + protected var cRowWrapper: CRowWrappingCollector = _ + + protected var joinFunction: FlatJoinFunction[Row, Row, Row] = _ + + override def open(parameters: Configuration): Unit = { +val clazz = compile( + getRuntimeContext.getUserCodeClassLoader, + genJoinFuncName, + genJoinFuncCode) + +joinFunction = clazz.newInstance() + +val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType) Review comment: Could we do it as a follow up task to minimize PR size? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r208833159 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToVersionedJoinRule.scala ## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptRule.{any, none, operand, some} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.core.TableFunctionScan +import org.apache.calcite.rel.logical.LogicalCorrelate +import org.apache.calcite.rex._ +import org.apache.calcite.sql.fun.SqlCastFunction +import org.apache.flink.table.api.{Table, ValidationException} +import org.apache.flink.table.functions.TableVersionFunction +import org.apache.flink.table.functions.sql.ProctimeSqlFunction +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.plan.logical.rel.LogicalVersionedJoin +import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType +import org.apache.flink.table.plan.util.RexDefaultVisitor +import org.apache.flink.table.typeutils.TypeStringUtils +import org.apache.flink.util.Preconditions.checkState + +class LogicalCorrelateToVersionedJoinRule + extends RelOptRule( +operand(classOf[LogicalCorrelate], + some( +operand(classOf[RelNode], any()), +operand(classOf[TableFunctionScan], none(, +"LogicalCorrelateToVersionedJoinRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { +val logicalCorrelate: LogicalCorrelate = call.rel(0) +val leftNode: RelNode = call.rel(1) +val rightTableFunctionScan: TableFunctionScan = call.rel(2) + +val cluster = logicalCorrelate.getCluster + +new GetTableVersionFunctionCall(cluster.getRexBuilder, leftNode) + .visit(rightTableFunctionScan.getCall) match { + case Some(rightVersionedTableCall) => +val underlyingTableHistory: Table = rightVersionedTableCall.tableVersionFunction.table +val relBuilder = this.relBuilderFactory.create( + cluster, + underlyingTableHistory.relBuilder.getRelOptSchema) +val rexBuilder = cluster.getRexBuilder + +val rightNode: RelNode = underlyingTableHistory.logicalPlan.toRelNode(relBuilder) + +val rightVersionExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.versionField) + +val rightPrimaryKeyExpression = createRightExpression( + rexBuilder, + leftNode, + rightNode, + rightVersionedTableCall.tableVersionFunction.primaryKey) + +relBuilder.push( + if (rightVersionedTableCall.tableVersionFunction.isProctime) { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightPrimaryKeyExpression) + } + else { +LogicalVersionedJoin.create( + rexBuilder, + cluster, + logicalCorrelate.getTraitSet, + leftNode, + rightNode, + rightVersionedTableCall.versionExpression, + rightVersionExpression, + rightPrimaryKeyExpression) + }) +call.transformTo(relBuilder.build()) + case None => +} + } + + private def createRightExpression( + rexBuilder: RexBuilder, + leftNode: RelNode, + rightNode: RelNode, + field: String): RexNode = { +val rightReferencesOffset = leftNode.getRowType.getFieldCount +val rightDataTypeField = rightNode.getRowType.getField(field, false, false) +rexBuilder.makeInputRef( + rightDataTypeField.getType, rightReferencesOffset + rightDataTypeField.getIndex) + } +} + +ob
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591630#comment-16591630 ] ASF GitHub Bot commented on FLINK-10204: benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212623923 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { Review comment: Yes, the prior implementation did this: `if (operatorId != that.operatorId) {` instead of calling equals(), which resulted in a false if the object is equivalent, but not the exact same object (for example, when the bytestream is copied). These changes are just from eclipse generating the equals and hashCode methods. I needed to do this so that the unit test would actually pass since it uses the equals method in it's assertion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
benlamonica commented on a change in pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610#discussion_r212623923 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java ## @@ -67,31 +67,40 @@ public int getSubtaskIndex() { // @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object obj) { Review comment: Yes, the prior implementation did this: `if (operatorId != that.operatorId) {` instead of calling equals(), which resulted in a false if the object is equivalent, but not the exact same object (for example, when the bytestream is copied). These changes are just from eclipse generating the equals and hashCode methods. I needed to do this so that the unit test would actually pass since it uses the equals method in it's assertion. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591597#comment-16591597 ] ASF GitHub Bot commented on FLINK-10136: xccui commented on a change in pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597#discussion_r212618421 ## File path: docs/dev/table/functions.md ## @@ -2614,6 +2626,18 @@ STRING.rtrim() E.g., 'This is a test String. '.rtrim() returns "This is a test String.". + + + +{% highlight java %} +STRING.repeat(INT) +{% endhighlight %} + + +Returns a string that repeats the base STRING INT times. +E.g., "This is a test String. ".repeat(2) returns "This is a test String.This is a test String.". Review comment: There's an additional space after the full stop. (I've fixed it in my branch, so just let it be there.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add REPEAT supported in Table API and SQL > - > > Key: FLINK-10136 > URL: https://issues.apache.org/jira/browse/FLINK-10136 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > Oracle : > [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat] > MySql: > https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL
xccui commented on a change in pull request #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597#discussion_r212618421 ## File path: docs/dev/table/functions.md ## @@ -2614,6 +2626,18 @@ STRING.rtrim() E.g., 'This is a test String. '.rtrim() returns "This is a test String.". + + + +{% highlight java %} +STRING.repeat(INT) +{% endhighlight %} + + +Returns a string that repeats the base STRING INT times. +E.g., "This is a test String. ".repeat(2) returns "This is a test String.This is a test String.". Review comment: There's an additional space after the full stop. (I've fixed it in my branch, so just let it be there.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10211) Time indicators are not always materialised for LogicalJoin
[ https://issues.apache.org/jira/browse/FLINK-10211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591587#comment-16591587 ] Xingcan Cui commented on FLINK-10211: - Hi [~pnowojski], I guess the two tickets you just filed are related to FLINK-8897, right? > Time indicators are not always materialised for LogicalJoin > --- > > Key: FLINK-10211 > URL: https://issues.apache.org/jira/browse/FLINK-10211 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Piotr Nowojski >Priority: Major > > Currently > {{org.apache.flink.table.calcite.RelTimeIndicatorConverter#visit(LogicalJoin)}} > correctly handles only windowed joins. Output of non windowed joins > shouldn't contain any time indicators. > CC [~twalthr] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API
[ https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591576#comment-16591576 ] ASF GitHub Bot commented on FLINK-9970: --- yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#issuecomment-415745275 cc @xccui rebased this PR please review again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ASCII/CHR function for table/sql API > > > Key: FLINK-9970 > URL: https://issues.apache.org/jira/browse/FLINK-9970 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > for ASCII function : > refer to : > [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii] > for CHR function : > This function convert ASCII code to a character, > refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html] > Considering "CHAR" always is a keyword in many database, so we use "CHR" > keyword. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API
yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API URL: https://github.com/apache/flink/pull/6432#issuecomment-415745275 cc @xccui rebased this PR please review again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10211) Time indicators are not always materialised for LogicalJoin
Piotr Nowojski created FLINK-10211: -- Summary: Time indicators are not always materialised for LogicalJoin Key: FLINK-10211 URL: https://issues.apache.org/jira/browse/FLINK-10211 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.0 Reporter: Piotr Nowojski Currently {{org.apache.flink.table.calcite.RelTimeIndicatorConverter#visit(LogicalJoin)}} correctly handles only windowed joins. Output of non windowed joins shouldn't contain any time indicators. CC [~twalthr] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10210) Time indicators are not always materialised for LogicalCorrelate
Piotr Nowojski created FLINK-10210: -- Summary: Time indicators are not always materialised for LogicalCorrelate Key: FLINK-10210 URL: https://issues.apache.org/jira/browse/FLINK-10210 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.6.0 Reporter: Piotr Nowojski Currenty {{org.apache.flink.table.calcite.RelTimeIndicatorConverter#visit(LogicalCorrelate)}} supports only the cases if right side is {{LogicalTableFunctionScan}}. That is not always the case. For example in case of {{org.apache.flink.table.api.stream.table.CorrelateTest#testFilter}} the LogicalFilter node is being pushed down to the right side of {{LogicalCorrelate}}. CC [~twalthr] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-10205: - Assignee: JIN SUN > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch
[ https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591538#comment-16591538 ] ASF GitHub Bot commented on FLINK-3875: --- twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611#issuecomment-415737635 CC @tzulitai @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a TableSink for Elasticsearch > - > > Key: FLINK-3875 > URL: https://issues.apache.org/jira/browse/FLINK-3875 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors, Table API & SQL >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Add a TableSink that writes data to Elasticsearch -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch
twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611#issuecomment-415737635 CC @tzulitai @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch
[ https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591537#comment-16591537 ] ASF GitHub Bot commented on FLINK-3875: --- twalthr opened a new pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611 ## What is the purpose of the change This PR adds full support for Elasticsearch to be used with Table & SQL API as well as SQL Client. ## Brief change log This PR includes: - Elasticsearch 6 upsert table sink - Elasticsearch 6 table factory - Elasticsearch table descriptors & validators - Unit tests - SQL Client end-to-end test - Website documentation ## Verifying this change - Unit tests for descriptors, factory, and sink - SQL Client end-to-end test extended with Elasticsearch sink ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a TableSink for Elasticsearch > - > > Key: FLINK-3875 > URL: https://issues.apache.org/jira/browse/FLINK-3875 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors, Table API & SQL >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Add a TableSink that writes data to Elasticsearch -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-3875) Add a TableSink for Elasticsearch
[ https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-3875: -- Labels: pull-request-available (was: ) > Add a TableSink for Elasticsearch > - > > Key: FLINK-3875 > URL: https://issues.apache.org/jira/browse/FLINK-3875 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors, Table API & SQL >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Add a TableSink that writes data to Elasticsearch -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr opened a new pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch
twalthr opened a new pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611 ## What is the purpose of the change This PR adds full support for Elasticsearch to be used with Table & SQL API as well as SQL Client. ## Brief change log This PR includes: - Elasticsearch 6 upsert table sink - Elasticsearch 6 table factory - Elasticsearch table descriptors & validators - Unit tests - SQL Client end-to-end test - Website documentation ## Verifying this change - Unit tests for descriptors, factory, and sink - SQL Client end-to-end test extended with Elasticsearch sink ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591535#comment-16591535 ] ASF GitHub Bot commented on FLINK-9559: --- hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866 Hi @pnowojski , thanks for your reply. There are many cases need this feature. Not only `case when`, but also `nvl`, `greatest` and `least`. Most users encounter the blank problem is `case when`. Examples have been added in the test cases. Below I will add some more examples: 1. ``` SELECT country_name FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) WHERE country_name = 'GERMANY' ``` This sql will output nothing since the blank problem. It is very confused. 2. ``` SELECT country_name, country_info FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) nameTable join infoTable on nameTable.country_name = infoTable.country_name; ``` This sql cannot join correctly since the blank problem. 'GERMANY' in nameTable becomes 'GERMANY---'. BTW, '-' means the blank. It is true the sql standard returns CHAR type, but nearly all major DBMSs return VARCHAR without blank-padded. A tool for easy online testing: http://sqlfiddle.com/ Thanks, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866 Hi @pnowojski , thanks for your reply. There are many cases need this feature. Not only `case when`, but also `nvl`, `greatest` and `least`. Most users encounter the blank problem is `case when`. Examples have been added in the test cases. Below I will add some more examples: 1. ``` SELECT country_name FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) WHERE country_name = 'GERMANY' ``` This sql will output nothing since the blank problem. It is very confused. 2. ``` SELECT country_name, country_info FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) nameTable join infoTable on nameTable.country_name = infoTable.country_name; ``` This sql cannot join correctly since the blank problem. 'GERMANY' in nameTable becomes 'GERMANY---'. BTW, '-' means the blank. It is true the sql standard returns CHAR type, but nearly all major DBMSs return VARCHAR without blank-padded. A tool for easy online testing: http://sqlfiddle.com/ Thanks, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591533#comment-16591533 ] ASF GitHub Bot commented on FLINK-9559: --- hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866 Hi @pnowojski , thanks for your reply. There are many cases need this feature. Not only `case when`, but also `nvl`, `greatest` and `least`. Most users encounter the blank problem is `case when`. Examples have been added in the test cases. Below I will add some more examples: 1. ``` SELECT country_name FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) WHERE country_name = 'GERMANY' ``` This sql will output nothing since the blank problem. It is very confused. 2. ``` SELECT country_name, country_info FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) nameTable join infoTable on nameTable.country_name = infoTable.country_name; ``` This sql cannot join correctly since the blank problem. 'GERMANY' in nameTable becomes 'GERMANY---'. BTW, '-' means the blank. It is true the sql standard returns CHAR type, but nearly all major DBMSs return VARCHAR without blank-padded. Thanks, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
hequn8128 edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866 Hi @pnowojski , thanks for your reply. There are many cases need this feature. Not only `case when`, but also `nvl`, `greatest` and `least`. Most users encounter the blank problem is `case when`. Examples have been added in the test cases. Below I will add some more examples: 1. ``` SELECT country_name FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) WHERE country_name = 'GERMANY' ``` This sql will output nothing since the blank problem. It is very confused. 2. ``` SELECT country_name, country_info FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) nameTable join infoTable on nameTable.country_name = infoTable.country_name; ``` This sql cannot join correctly since the blank problem. 'GERMANY' in nameTable becomes 'GERMANY---'. BTW, '-' means the blank. It is true the sql standard returns CHAR type, but nearly all major DBMSs return VARCHAR without blank-padded. Thanks, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866 Hi @pnowojski , thanks for your reply. There are many cases need this feature. Not only `case when`, but also `nvl`, `greatest` and `least`. Most users encounter the blank problem is `case when`. Examples have been added in the test cases. Below I will add some more examples: 1. ``` SELECT country_name FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) WHERE country_name = 'GERMANY' ``` This sql will output nothing since the blank problem. It is very confused. 2. ``` SELECT country_name, country_info FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) nameTable join infoTable on nameTable.country_name = infoTable.country_name; ``` This sql cannot join correctly since the blank problem. 'GERMANY' in nameTable becomes 'GERMANY '. It is true the sql standard returns CHAR type, but nearly all major DBMSs return VARCHAR without blank-padded. Thanks, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591530#comment-16591530 ] ASF GitHub Bot commented on FLINK-9559: --- hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415735866 Hi @pnowojski , thanks for your reply. There are many cases need this feature. Not only `case when`, but also `nvl`, `greatest` and `least`. Most users encounter the blank problem is `case when`. Examples have been added in the test cases. Below I will add some more examples: 1. ``` SELECT country_name FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) WHERE country_name = 'GERMANY' ``` This sql will output nothing since the blank problem. It is very confused. 2. ``` SELECT country_name, country_info FROM ( SELECT CASE id WHEN 1 THEN 'GERMANY' WHEN 2 THEN 'CANADA' ELSE 'INVALID COUNTRY ID' END AS country_name FROM country_id ) nameTable join infoTable on nameTable.country_name = infoTable.country_name; ``` This sql cannot join correctly since the blank problem. 'GERMANY' in nameTable becomes 'GERMANY '. It is true the sql standard returns CHAR type, but nearly all major DBMSs return VARCHAR without blank-padded. Thanks, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591511#comment-16591511 ] eugen yushin edited comment on FLINK-10050 at 8/24/18 11:31 AM: [~aljoscha] There's no info about windows for any of consecutive operator is retained in Flink. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results] {code:java} The result of a windowed operation is again a {{DataStream}}, no information about the windowed operations is retained in the result elements {code} At the same time, coGroup/join keeps element's timestamps and consecutive operators can assign elements to respective windows. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join] {code:java} Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with {{[5, 10)}} as its boundaries would result in the joined elements having 9 as their timestamp. {code} Business case: 2 streams, 1 for different business metrics, another one - similar metrics but from microservices logs, result - reconciliation of these 2 streams. No other operators except sink are need for this particular business case. was (Author: eyushin): [~aljoscha] There's no info about windows for any consecutive of operator in Flink. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results] {code:java} The result of a windowed operation is again a {{DataStream}}, no information about the windowed operations is retained in the result elements {code} At the same time, coGroup/join keeps element's timestamps and consecutive operators can assign elements to respective windows. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join] {code:java} Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with {{[5, 10)}} as its boundaries would result in the joined elements having 9 as their timestamp. {code} Business case: 2 streams, 1 for different business metrics, another one - similar metrics but from microservices logs, result - reconciliation of these 2 streams. No other operators except sink are need for this particular business case. > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591511#comment-16591511 ] eugen yushin edited comment on FLINK-10050 at 8/24/18 11:30 AM: [~aljoscha] There's no info about windows for any consecutive of operator in Flink. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results] {code:java} The result of a windowed operation is again a {{DataStream}}, no information about the windowed operations is retained in the result elements {code} At the same time, coGroup/join keeps element's timestamps and consecutive operators can assign elements to respective windows. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join] {code:java} Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with {{[5, 10)}} as its boundaries would result in the joined elements having 9 as their timestamp. {code} Business case: 2 streams, 1 for different business metrics, another one - similar metrics but from microservices logs, result - reconciliation of these 2 streams. No other operators except sink are need for this particular business case. was (Author: eyushin): [~aljoscha] There's no info about windows for any of operator in Flink. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results] {code} The result of a windowed operation is again a {{DataStream}}, no information about the windowed operations is retained in the result elements {code} At the same time, coGroup/join keeps element's timestamps and consecutive operators can assign elements to respective windows. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join] {code} Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with {{[5, 10)}} as its boundaries would result in the joined elements having 9 as their timestamp. {code} Business case: 2 streams, 1 for different business metrics, another one - similar metrics but from microservices logs, result - reconciliation of these 2 streams. No other operators except sink are need for this particular business case. > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591511#comment-16591511 ] eugen yushin edited comment on FLINK-10050 at 8/24/18 11:29 AM: [~aljoscha] There's no info about windows for any of operator in Flink. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results] {code} The result of a windowed operation is again a {{DataStream}}, no information about the windowed operations is retained in the result elements {code} At the same time, coGroup/join keeps element's timestamps and consecutive operators can assign elements to respective windows. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join] {code} Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with {{[5, 10)}} as its boundaries would result in the joined elements having 9 as their timestamp. {code} Business case: 2 streams, 1 for different business metrics, another one - similar metrics but from microservices logs, result - reconciliation of these 2 streams. No other operators except sink are need for this particular business case. was (Author: eyushin): [~aljoscha] There's no info about windows for any of operator in Flink. Docs: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results ``` The result of a windowed operation is again a {{DataStream}}, no information about the windowed operations is retained in the result elements ``` At the same time, coGroup/join keeps element's timestamps and consecutive operators can assign elements to respective windows. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join] ``` Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with {{[5, 10)}} as its boundaries would result in the joined elements having 9 as their timestamp. ``` Business case: 2 streams, 1 for different business metrics, another one - similar metrics but from microservices logs, result - reconciliation of these 2 streams. No other operators except sink are need for this particular business case. > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591511#comment-16591511 ] eugen yushin commented on FLINK-10050: -- [~aljoscha] There's no info about windows for any of operator in Flink. Docs: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#working-with-window-results ``` The result of a windowed operation is again a {{DataStream}}, no information about the windowed operations is retained in the result elements ``` At the same time, coGroup/join keeps element's timestamps and consecutive operators can assign elements to respective windows. Docs: [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#window-join] ``` Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with {{[5, 10)}} as its boundaries would result in the joined elements having 9 as their timestamp. ``` Business case: 2 streams, 1 for different business metrics, another one - similar metrics but from microservices logs, result - reconciliation of these 2 streams. No other operators except sink are need for this particular business case. > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7964) Add Apache Kafka 1.0/1.1 connectors
[ https://issues.apache.org/jira/browse/FLINK-7964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591513#comment-16591513 ] ASF GitHub Bot commented on FLINK-7964: --- yanghua commented on a change in pull request #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#discussion_r212599001 ## File path: flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java ## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; + +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Optional; + +/** + * IT cases for Kafka 1.0 . + */ +public class Kafka10ITCase extends KafkaConsumerTestBase { Review comment: @pnowojski I tried to extend from Kafka011ITCase : ```java public class Kafka10ITCase extends Kafka011ITCase ``` but the prepare method : ```java @BeforeClass public static void prepare() throws ClassNotFoundException { KafkaProducerTestBase.prepare();//here ((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE); } ``` will trigger the method call chain : ```java protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException { // dynamically load the implementation for the test //here will load KafkaTestEnvironmentImpl from connector 0.11, not 1.0 Class clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); ``` the `startClusters` is a static class, the `clazz` is the instance of KafkaTestEnvironmentImpl from 0.11 module, and we can not use polymorphism to get a instance of KafkaTestEnvironmentImpl from 1.0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Apache Kafka 1.0/1.1 connectors > --- > > Key: FLINK-7964 > URL: https://issues.apache.org/jira/browse/FLINK-79
[GitHub] yanghua commented on a change in pull request #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors
yanghua commented on a change in pull request #6577: [FLINK-7964] Add Apache Kafka 1.0/1.1 connectors URL: https://github.com/apache/flink/pull/6577#discussion_r212599001 ## File path: flink-connectors/flink-connector-kafka-1.0/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka10ITCase.java ## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; + +import org.junit.BeforeClass; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Optional; + +/** + * IT cases for Kafka 1.0 . + */ +public class Kafka10ITCase extends KafkaConsumerTestBase { Review comment: @pnowojski I tried to extend from Kafka011ITCase : ```java public class Kafka10ITCase extends Kafka011ITCase ``` but the prepare method : ```java @BeforeClass public static void prepare() throws ClassNotFoundException { KafkaProducerTestBase.prepare();//here ((KafkaTestEnvironmentImpl) kafkaServer).setProducerSemantic(FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE); } ``` will trigger the method call chain : ```java protected static void startClusters(boolean secureMode, boolean hideKafkaBehindProxy) throws ClassNotFoundException { // dynamically load the implementation for the test //here will load KafkaTestEnvironmentImpl from connector 0.11, not 1.0 Class clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl"); kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz); LOG.info("Starting KafkaTestBase.prepare() for Kafka " + kafkaServer.getVersion()); ``` the `startClusters` is a static class, the `clazz` is the instance of KafkaTestEnvironmentImpl from 0.11 module, and we can not use polymorphism to get a instance of KafkaTestEnvironmentImpl from 1.0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591489#comment-16591489 ] JIN SUN commented on FLINK-10205: - In my opinion we need deterministic, especially when a task rerun, the output should as same as the previous version. Today's logic might skip the split that previous task processed and assign a different split, this will lead incorrect result. Fabian, we didn't restrict the InputSplit assignment, instead, by add a small piece of code in Execution.java and ExecutionVertex.java, we can make it deterministic. i'm preparing the code, we can have a further discussion when it ready. > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591484#comment-16591484 ] Fabian Hueske commented on FLINK-10205: --- Oh, I didn't notice that this issue was created in the context of FLINK-4256. Please disregard my previous comment. > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)