[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375378#comment-16375378
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5573

[FLINK-8756][Client] Support ClusterClient.getAccumulators() in 
RestClusterClient

## What is the purpose of the change

This PR Support ClusterClient.getAccumulators() in RestClusterClient.

## Brief change log

  - *Send REST request to get `JobAccumulatorsInfo` object*
  - *Use jackson's ObjectMapper convert `JobAccumulatorsInfo` object to Map*
  - *Add a test method into `RestClusterClientTest` class to test the 
`getAccumulators` function*
  - *Add a test handler to mock `JobAccumulatorsInfo` object*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test that validates that actual accumulators size equals we 
mocked in the test handler*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8756

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5573.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5573


commit ec8ef5d8ad6d650e250737d5005173994337168c
Author: vinoyang 
Date:   2018-02-24T06:50:55Z

[FLINK-8756][Client] Support ClusterClient.getAccumulators() in 
RestClusterClient




> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5573: [FLINK-8756][Client] Support ClusterClient.getAccu...

2018-02-23 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5573

[FLINK-8756][Client] Support ClusterClient.getAccumulators() in 
RestClusterClient

## What is the purpose of the change

This PR Support ClusterClient.getAccumulators() in RestClusterClient.

## Brief change log

  - *Send REST request to get `JobAccumulatorsInfo` object*
  - *Use jackson's ObjectMapper convert `JobAccumulatorsInfo` object to Map*
  - *Add a test method into `RestClusterClientTest` class to test the 
`getAccumulators` function*
  - *Add a test handler to mock `JobAccumulatorsInfo` object*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test that validates that actual accumulators size equals we 
mocked in the test handler*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8756

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5573.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5573


commit ec8ef5d8ad6d650e250737d5005173994337168c
Author: vinoyang 
Date:   2018-02-24T06:50:55Z

[FLINK-8756][Client] Support ClusterClient.getAccumulators() in 
RestClusterClient




---


[jira] [Commented] (FLINK-7641) Loss of JobManager in HA mode should not cause jobs to fail

2018-02-23 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375201#comment-16375201
 ] 

vinoyang commented on FLINK-7641:
-

yes [~elevy] you are right, our implementation go further than your 
description, we do not need pause running jobs, the running jobs are still 
running when the master JM failover.

> Loss of JobManager in HA mode should not cause jobs to fail
> ---
>
> Key: FLINK-7641
> URL: https://issues.apache.org/jira/browse/FLINK-7641
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>
> Currently if a standalone cluster of JobManagers is configured in 
> high-availability mode and the master JM is lost, the job executing in the 
> cluster will be restarted.  This is less than ideal.  It would be best if the 
> jobs could continue to execute without restarting while one of the spare JMs 
> becomes the new master, or in the worse case, the jobs are paused while the 
> JM election takes place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7641) Loss of JobManager in HA mode should not cause jobs to fail

2018-02-23 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375179#comment-16375179
 ] 

Elias Levy commented on FLINK-7641:
---

I mean that if you have a standalone cluster in HA mode with multiple JMs, if 
the current master JM fails, any jobs executing in the cluster will be stopped 
and then restored by the new master JM.  Ideally master JM failover should be 
largely invisible to running jobs.  At most, they should be temporary paused 
and continued, rather than stopped and restarted.

> Loss of JobManager in HA mode should not cause jobs to fail
> ---
>
> Key: FLINK-7641
> URL: https://issues.apache.org/jira/browse/FLINK-7641
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>
> Currently if a standalone cluster of JobManagers is configured in 
> high-availability mode and the master JM is lost, the job executing in the 
> cluster will be restarted.  This is less than ideal.  It would be best if the 
> jobs could continue to execute without restarting while one of the spare JMs 
> becomes the new master, or in the worse case, the jobs are paused while the 
> JM election takes place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8761) Various improvements to the Quickstarts

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374912#comment-16374912
 ] 

ASF GitHub Bot commented on FLINK-8761:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5569
  
Thanks for checking this out!

Concerning the removed `flink-clients` dependency - that was done on 
purpose. Here is my motivation for that:
  - For the 'provided' API dependency, it should actually not be there.
  - It mainly matters to the in-Eclipse execution, which needs a 'provided' 
runtime.
  - I would prefer to remove it now, because the dependencies probably 
change anyways in the future.

The proposal I want to make for the next Flink version is to have something 
like `flink-all` (or `flink-base`) which exactly refers to what is in the 
`flink-dist` jar (and `flink-dist` draws its jar from there). We set that as a 
provided dependency anywhere (connector, library, quickstart), we have exactly 
everything provided that will available through Flink's runtime, and nothing 
that will not be there. Easiest way to keep these things in sync.

Now I am unsure if `flink-dist` can currently take a `flink-all` like role, 
because it declares a lot of additional provided dependencies, for the sake of 
putting them into `opt` or `examples`. Now, according to the maven dependency 
management, a transitive provided dependency of a provided dependency is not 
propagated (in which case it would be okay to use flink-dist), but I am not 
sure we want to rely that across maven command line, IntelliJ and Eclipse, etc.

The last sentence incidentally just made me realize that I should probably 
change the `flink-dist` dependency in the IDEA profile to 
`flink-streaming-java` / `flink-clients` for exactly that reason... 


> Various improvements to the Quickstarts
> ---
>
> Key: FLINK-8761
> URL: https://issues.apache.org/jira/browse/FLINK-8761
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> Various improvements to the Quickstarts to give a smoother out of the box 
> experience.
> Broken down into the subtasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5569: [FLINK-8761] [quickstarts] Big improvements to the quicks...

2018-02-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5569
  
Thanks for checking this out!

Concerning the removed `flink-clients` dependency - that was done on 
purpose. Here is my motivation for that:
  - For the 'provided' API dependency, it should actually not be there.
  - It mainly matters to the in-Eclipse execution, which needs a 'provided' 
runtime.
  - I would prefer to remove it now, because the dependencies probably 
change anyways in the future.

The proposal I want to make for the next Flink version is to have something 
like `flink-all` (or `flink-base`) which exactly refers to what is in the 
`flink-dist` jar (and `flink-dist` draws its jar from there). We set that as a 
provided dependency anywhere (connector, library, quickstart), we have exactly 
everything provided that will available through Flink's runtime, and nothing 
that will not be there. Easiest way to keep these things in sync.

Now I am unsure if `flink-dist` can currently take a `flink-all` like role, 
because it declares a lot of additional provided dependencies, for the sake of 
putting them into `opt` or `examples`. Now, according to the maven dependency 
management, a transitive provided dependency of a provided dependency is not 
propagated (in which case it would be okay to use flink-dist), but I am not 
sure we want to rely that across maven command line, IntelliJ and Eclipse, etc.

The last sentence incidentally just made me realize that I should probably 
change the `flink-dist` dependency in the IDEA profile to 
`flink-streaming-java` / `flink-clients` for exactly that reason... 


---


[jira] [Commented] (FLINK-8761) Various improvements to the Quickstarts

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374907#comment-16374907
 ] 

ASF GitHub Bot commented on FLINK-8761:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5569#discussion_r170351589
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -50,181 +51,113 @@ under the License.


 
-   
-


-   
-   org.apache.flink
-   flink-core
-   ${flink.version}
-   
+   

org.apache.flink
flink-java
${flink.version}
+   provided


-   
org.apache.flink
-   
flink-clients_${scala.binary.version}
+   
flink-streaming-java_${scala.binary.version}
${flink.version}
+   provided

+
+   
+
+   
 
-   
+   
+   

org.slf4j
slf4j-log4j12
-   ${slf4j.version}
+   1.7.7
+   runtime


log4j
log4j
-   ${log4j.version}
+   1.2.17
+   runtime


 
-   
-   
-   
-   build-jar
-
-   
-   false
-   
-
-   
-   
-   org.apache.flink
-   flink-core
-   ${flink.version}
-   provided
-   
-   
-   org.apache.flink
-   flink-java
-   ${flink.version}
-   provided
-   
-   
-   org.apache.flink
-   
flink-clients_${scala.binary.version}
-   ${flink.version}
-   provided
-   
-   
-   org.apache.flink
-   
flink-streaming-java_${scala.binary.version}
-   ${flink.version}
-   provided
-   
-   
-   org.slf4j
-   slf4j-log4j12
-   ${slf4j.version}
-   provided
-   
-   
-   log4j
-   log4j
-   ${log4j.version}
-   provided
-   
-   
-
-   
-   
-   
-   
-   
org.apache.maven.plugins
-   
maven-shade-plugin
-   3.0.0
-   
-   
-   
-   
package
-   
-   
shade
-   
-   
-   

-   

-   
org.apache.flink:force-shading
-   
com.google.code.findbugs:jsr305
-   

[GitHub] flink pull request #5569: [FLINK-8761] [quickstarts] Big improvements to the...

2018-02-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5569#discussion_r170351589
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -50,181 +51,113 @@ under the License.


 
-   
-


-   
-   org.apache.flink
-   flink-core
-   ${flink.version}
-   
+   

org.apache.flink
flink-java
${flink.version}
+   provided


-   
org.apache.flink
-   
flink-clients_${scala.binary.version}
+   
flink-streaming-java_${scala.binary.version}
${flink.version}
+   provided

+
+   
+
+   
 
-   
+   
+   

org.slf4j
slf4j-log4j12
-   ${slf4j.version}
+   1.7.7
+   runtime


log4j
log4j
-   ${log4j.version}
+   1.2.17
+   runtime


 
-   
-   
-   
-   build-jar
-
-   
-   false
-   
-
-   
-   
-   org.apache.flink
-   flink-core
-   ${flink.version}
-   provided
-   
-   
-   org.apache.flink
-   flink-java
-   ${flink.version}
-   provided
-   
-   
-   org.apache.flink
-   
flink-clients_${scala.binary.version}
-   ${flink.version}
-   provided
-   
-   
-   org.apache.flink
-   
flink-streaming-java_${scala.binary.version}
-   ${flink.version}
-   provided
-   
-   
-   org.slf4j
-   slf4j-log4j12
-   ${slf4j.version}
-   provided
-   
-   
-   log4j
-   log4j
-   ${log4j.version}
-   provided
-   
-   
-
-   
-   
-   
-   
-   
org.apache.maven.plugins
-   
maven-shade-plugin
-   3.0.0
-   
-   
-   
-   
package
-   
-   
shade
-   
-   
-   

-   

-   
org.apache.flink:force-shading
-   
com.google.code.findbugs:jsr305
-   
org.slf4j:*
-   

-   

[GitHub] flink pull request #5569: [FLINK-8761] [quickstarts] Big improvements to the...

2018-02-23 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5569#discussion_r170351448
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -50,181 +51,113 @@ under the License.


 
-   
-


-   
-   org.apache.flink
-   flink-core
-   ${flink.version}
-   
+   

org.apache.flink
flink-java
${flink.version}
+   provided


-   
org.apache.flink
-   
flink-clients_${scala.binary.version}
+   
flink-streaming-java_${scala.binary.version}
${flink.version}
+   provided

+
+   
+
+   

[jira] [Commented] (FLINK-8761) Various improvements to the Quickstarts

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374906#comment-16374906
 ] 

ASF GitHub Bot commented on FLINK-8761:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5569#discussion_r170351448
  
--- Diff: 
flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 ---
@@ -50,181 +51,113 @@ under the License.


 
-   
-


-   
-   org.apache.flink
-   flink-core
-   ${flink.version}
-   
+   

org.apache.flink
flink-java
${flink.version}
+   provided


-   
org.apache.flink
-   
flink-clients_${scala.binary.version}
+   
flink-streaming-java_${scala.binary.version}
${flink.version}
+   provided

+
+   
+
+   
> Key: FLINK-8761
> URL: https://issues.apache.org/jira/browse/FLINK-8761
> Project: Flink
>  Issue Type: Improvement
>  Components: Quickstarts
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> Various improvements to the Quickstarts to give a smoother out of the box 
> experience.
> Broken down into the subtasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374695#comment-16374695
 ] 

ASF GitHub Bot commented on FLINK-8746:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170314325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exceptions which indicate that a checkpoint triggering has failed.
+ *
+ */
+public class CheckpointTriggerException extends FlinkException {
+
+   private static final long serialVersionUID = -3330160816161901752L;
+
+   private final CheckpointDeclineReason checkpointDeclineReason;
+
+   public CheckpointTriggerException(String message, 
CheckpointDeclineReason checkpointDeclineReason) {
+   super(message + " Decline reason: " + checkpointDeclineReason);
--- End diff --

True, this is better. Will fix it.


> Support rescaling of jobs which are not fully running
> -
>
> Key: FLINK-8746
> URL: https://issues.apache.org/jira/browse/FLINK-8746
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should support the rescaling of jobs which are only partially running. 
> Currently, this fails because rescaling requires to take a savepoint. We can 
> solve the problem by falling back to the latest rescaling savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374696#comment-16374696
 ] 

ASF GitHub Bot commented on FLINK-8746:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170314390
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -534,42 +536,94 @@ public void postStop() throws Exception {
 
// 4. take a savepoint
final CompletableFuture savepointFuture = 
triggerSavepoint(
-   jobMasterConfiguration.getTmpDirectory(),
-   timeout);
+   null,
+   timeout)
+   .handleAsync(
+   (String savepointPath, Throwable throwable) -> {
+   if (throwable != null) {
+   final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+   if (strippedThrowable 
instanceof CheckpointTriggerException) {
+   final 
CheckpointTriggerException checkpointTriggerException = 
(CheckpointTriggerException) strippedThrowable;
+
+   if 
(checkpointTriggerException.getCheckpointDeclineReason() == 
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
+   return 
lastInternalSavepoint;
+   } else {
+   throw new 
CompletionException(checkpointTriggerException);
+   }
+   } else {
+   throw new 
CompletionException(strippedThrowable);
+   }
+   } else {
+   final String savepointToDispose 
= lastInternalSavepoint;
--- End diff --

You're totally right. Will add a guard.


> Support rescaling of jobs which are not fully running
> -
>
> Key: FLINK-8746
> URL: https://issues.apache.org/jira/browse/FLINK-8746
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should support the rescaling of jobs which are only partially running. 
> Currently, this fails because rescaling requires to take a savepoint. We can 
> solve the problem by falling back to the latest rescaling savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...

2018-02-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170314390
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -534,42 +536,94 @@ public void postStop() throws Exception {
 
// 4. take a savepoint
final CompletableFuture savepointFuture = 
triggerSavepoint(
-   jobMasterConfiguration.getTmpDirectory(),
-   timeout);
+   null,
+   timeout)
+   .handleAsync(
+   (String savepointPath, Throwable throwable) -> {
+   if (throwable != null) {
+   final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+   if (strippedThrowable 
instanceof CheckpointTriggerException) {
+   final 
CheckpointTriggerException checkpointTriggerException = 
(CheckpointTriggerException) strippedThrowable;
+
+   if 
(checkpointTriggerException.getCheckpointDeclineReason() == 
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
+   return 
lastInternalSavepoint;
+   } else {
+   throw new 
CompletionException(checkpointTriggerException);
+   }
+   } else {
+   throw new 
CompletionException(strippedThrowable);
+   }
+   } else {
+   final String savepointToDispose 
= lastInternalSavepoint;
--- End diff --

You're totally right. Will add a guard.


---


[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...

2018-02-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170314325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exceptions which indicate that a checkpoint triggering has failed.
+ *
+ */
+public class CheckpointTriggerException extends FlinkException {
+
+   private static final long serialVersionUID = -3330160816161901752L;
+
+   private final CheckpointDeclineReason checkpointDeclineReason;
+
+   public CheckpointTriggerException(String message, 
CheckpointDeclineReason checkpointDeclineReason) {
+   super(message + " Decline reason: " + checkpointDeclineReason);
--- End diff --

True, this is better. Will fix it.


---


[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374693#comment-16374693
 ] 

ASF GitHub Bot commented on FLINK-8746:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170314219
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -289,7 +290,7 @@ public void invoke() throws Exception {
 
// wait forever (until canceled)
synchronized (this) {
-   while (error == null && lastCheckpointId < 
numCalls) {
--- End diff --

The only locations where a notify was called is when the error is set to 
`!= null`. Furthermore I think, the testing task is not intended to complete in 
case that `lastCheckpointId >= numCalls`. Alternatively, one could fix the 
problem by executing the `lastCheckpointId` under the lock as well as the 
trigger of `awaitLatch`.


> Support rescaling of jobs which are not fully running
> -
>
> Key: FLINK-8746
> URL: https://issues.apache.org/jira/browse/FLINK-8746
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should support the rescaling of jobs which are only partially running. 
> Currently, this fails because rescaling requires to take a savepoint. We can 
> solve the problem by falling back to the latest rescaling savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...

2018-02-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170314219
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -289,7 +290,7 @@ public void invoke() throws Exception {
 
// wait forever (until canceled)
synchronized (this) {
-   while (error == null && lastCheckpointId < 
numCalls) {
--- End diff --

The only locations where a notify was called is when the error is set to 
`!= null`. Furthermore I think, the testing task is not intended to complete in 
case that `lastCheckpointId >= numCalls`. Alternatively, one could fix the 
problem by executing the `lastCheckpointId` under the lock as well as the 
trigger of `awaitLatch`.


---


[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374692#comment-16374692
 ] 

ASF GitHub Bot commented on FLINK-8746:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170313695
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -1156,6 +1213,26 @@ private void disposeSavepoint(String savepointPath) {
}
}
 
+   /**
+* Tries to restore the given {@link ExecutionGraph} from the provided 
{@link SavepointRestoreSettings}.
+*
+* @param executionGraphToRestore {@link ExecutionGraph} which is 
supposed to be restored
+* @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about the savepoint to restore from
+* @throws Exception if the {@link ExecutionGraph} could not be restored
+*/
+   private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph 
executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) 
throws Exception {
+   if (savepointRestoreSettings.restoreSavepoint()) {
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraphToRestore.getCheckpointCoordinator();
+   if (checkpointCoordinator != null) {
--- End diff --

The checks are not done at all call sites. Only in the constructor you have 
this check.


> Support rescaling of jobs which are not fully running
> -
>
> Key: FLINK-8746
> URL: https://issues.apache.org/jira/browse/FLINK-8746
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should support the rescaling of jobs which are only partially running. 
> Currently, this fails because rescaling requires to take a savepoint. We can 
> solve the problem by falling back to the latest rescaling savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...

2018-02-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170313695
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -1156,6 +1213,26 @@ private void disposeSavepoint(String savepointPath) {
}
}
 
+   /**
+* Tries to restore the given {@link ExecutionGraph} from the provided 
{@link SavepointRestoreSettings}.
+*
+* @param executionGraphToRestore {@link ExecutionGraph} which is 
supposed to be restored
+* @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about the savepoint to restore from
+* @throws Exception if the {@link ExecutionGraph} could not be restored
+*/
+   private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph 
executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) 
throws Exception {
+   if (savepointRestoreSettings.restoreSavepoint()) {
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraphToRestore.getCheckpointCoordinator();
+   if (checkpointCoordinator != null) {
--- End diff --

The checks are not done at all call sites. Only in the constructor you have 
this check.


---


[jira] [Created] (FLINK-8771) Upgrade scalastyle to 1.0.0

2018-02-23 Thread Ted Yu (JIRA)
Ted Yu created FLINK-8771:
-

 Summary: Upgrade scalastyle to 1.0.0
 Key: FLINK-8771
 URL: https://issues.apache.org/jira/browse/FLINK-8771
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Ted Yu


scalastyle 1.0.0 fixes issue with import order, explicit type for public 
methods, line length limitation and comment validation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374620#comment-16374620
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170304145
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafeRemove(removeProperty)
 validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
--- End diff --

Sorry about that. I forgot to rebuild after rebasing.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170304145
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafeRemove(removeProperty)
 validator().validate(properties)
   }
 }
+
+class TestTableSourceDescriptor(connector: ConnectorDescriptor)
+  extends TableSourceDescriptor(connector) {
--- End diff --

Sorry about that. I forgot to rebuild after rebasing.


---


[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374600#comment-16374600
 ] 

ASF GitHub Bot commented on FLINK-8746:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170300254
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -534,42 +536,94 @@ public void postStop() throws Exception {
 
// 4. take a savepoint
final CompletableFuture savepointFuture = 
triggerSavepoint(
-   jobMasterConfiguration.getTmpDirectory(),
-   timeout);
+   null,
+   timeout)
+   .handleAsync(
+   (String savepointPath, Throwable throwable) -> {
+   if (throwable != null) {
+   final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+   if (strippedThrowable 
instanceof CheckpointTriggerException) {
+   final 
CheckpointTriggerException checkpointTriggerException = 
(CheckpointTriggerException) strippedThrowable;
+
+   if 
(checkpointTriggerException.getCheckpointDeclineReason() == 
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
+   return 
lastInternalSavepoint;
+   } else {
+   throw new 
CompletionException(checkpointTriggerException);
+   }
+   } else {
+   throw new 
CompletionException(strippedThrowable);
+   }
+   } else {
+   final String savepointToDispose 
= lastInternalSavepoint;
--- End diff --

I think `savepointToDispose` be `null`.


> Support rescaling of jobs which are not fully running
> -
>
> Key: FLINK-8746
> URL: https://issues.apache.org/jira/browse/FLINK-8746
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should support the rescaling of jobs which are only partially running. 
> Currently, this fails because rescaling requires to take a savepoint. We can 
> solve the problem by falling back to the latest rescaling savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374603#comment-16374603
 ] 

ASF GitHub Bot commented on FLINK-8746:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170279814
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -1156,6 +1213,26 @@ private void disposeSavepoint(String savepointPath) {
}
}
 
+   /**
+* Tries to restore the given {@link ExecutionGraph} from the provided 
{@link SavepointRestoreSettings}.
+*
+* @param executionGraphToRestore {@link ExecutionGraph} which is 
supposed to be restored
+* @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about the savepoint to restore from
+* @throws Exception if the {@link ExecutionGraph} could not be restored
+*/
+   private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph 
executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) 
throws Exception {
+   if (savepointRestoreSettings.restoreSavepoint()) {
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraphToRestore.getCheckpointCoordinator();
+   if (checkpointCoordinator != null) {
--- End diff --

I think at this point the `checkpointCoordinator` should not be `null` 
(there already are checks before). Maybe replace it with a `checkState`.


> Support rescaling of jobs which are not fully running
> -
>
> Key: FLINK-8746
> URL: https://issues.apache.org/jira/browse/FLINK-8746
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should support the rescaling of jobs which are only partially running. 
> Currently, this fails because rescaling requires to take a savepoint. We can 
> solve the problem by falling back to the latest rescaling savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374602#comment-16374602
 ] 

ASF GitHub Bot commented on FLINK-8746:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170281192
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -289,7 +290,7 @@ public void invoke() throws Exception {
 
// wait forever (until canceled)
synchronized (this) {
-   while (error == null && lastCheckpointId < 
numCalls) {
--- End diff --

Are you sure this needed to be removed?


> Support rescaling of jobs which are not fully running
> -
>
> Key: FLINK-8746
> URL: https://issues.apache.org/jira/browse/FLINK-8746
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should support the rescaling of jobs which are only partially running. 
> Currently, this fails because rescaling requires to take a savepoint. We can 
> solve the problem by falling back to the latest rescaling savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374601#comment-16374601
 ] 

ASF GitHub Bot commented on FLINK-8746:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170296322
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exceptions which indicate that a checkpoint triggering has failed.
+ *
+ */
+public class CheckpointTriggerException extends FlinkException {
+
+   private static final long serialVersionUID = -3330160816161901752L;
+
+   private final CheckpointDeclineReason checkpointDeclineReason;
+
+   public CheckpointTriggerException(String message, 
CheckpointDeclineReason checkpointDeclineReason) {
+   super(message + " Decline reason: " + checkpointDeclineReason);
--- End diff --

Not super important because it is never logged but you could include the 
enums `.message()` into the exception message.


> Support rescaling of jobs which are not fully running
> -
>
> Key: FLINK-8746
> URL: https://issues.apache.org/jira/browse/FLINK-8746
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should support the rescaling of jobs which are only partially running. 
> Currently, this fails because rescaling requires to take a savepoint. We can 
> solve the problem by falling back to the latest rescaling savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8746) Support rescaling of jobs which are not fully running

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374604#comment-16374604
 ] 

ASF GitHub Bot commented on FLINK-8746:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170296168
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exceptions which indicate that a checkpoint triggering has failed.
+ *
+ */
+public class CheckpointTriggerException extends FlinkException {
+
+   private static final long serialVersionUID = -3330160816161901752L;
+
+   private final CheckpointDeclineReason checkpointDeclineReason;
+
+   public CheckpointTriggerException(String message, 
CheckpointDeclineReason checkpointDeclineReason) {
+   super(message + " Decline reason: " + checkpointDeclineReason);
--- End diff --

Not super important because it is never logged but you could include the 
enums `.message()` into the exception message.


> Support rescaling of jobs which are not fully running
> -
>
> Key: FLINK-8746
> URL: https://issues.apache.org/jira/browse/FLINK-8746
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should support the rescaling of jobs which are only partially running. 
> Currently, this fails because rescaling requires to take a savepoint. We can 
> solve the problem by falling back to the latest rescaling savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...

2018-02-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170279814
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -1156,6 +1213,26 @@ private void disposeSavepoint(String savepointPath) {
}
}
 
+   /**
+* Tries to restore the given {@link ExecutionGraph} from the provided 
{@link SavepointRestoreSettings}.
+*
+* @param executionGraphToRestore {@link ExecutionGraph} which is 
supposed to be restored
+* @param savepointRestoreSettings {@link SavepointRestoreSettings} 
containing information about the savepoint to restore from
+* @throws Exception if the {@link ExecutionGraph} could not be restored
+*/
+   private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph 
executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) 
throws Exception {
+   if (savepointRestoreSettings.restoreSavepoint()) {
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraphToRestore.getCheckpointCoordinator();
+   if (checkpointCoordinator != null) {
--- End diff --

I think at this point the `checkpointCoordinator` should not be `null` 
(there already are checks before). Maybe replace it with a `checkState`.


---


[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...

2018-02-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170296322
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exceptions which indicate that a checkpoint triggering has failed.
+ *
+ */
+public class CheckpointTriggerException extends FlinkException {
+
+   private static final long serialVersionUID = -3330160816161901752L;
+
+   private final CheckpointDeclineReason checkpointDeclineReason;
+
+   public CheckpointTriggerException(String message, 
CheckpointDeclineReason checkpointDeclineReason) {
+   super(message + " Decline reason: " + checkpointDeclineReason);
--- End diff --

Not super important because it is never logged but you could include the 
enums `.message()` into the exception message.


---


[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...

2018-02-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170296168
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exceptions which indicate that a checkpoint triggering has failed.
+ *
+ */
+public class CheckpointTriggerException extends FlinkException {
+
+   private static final long serialVersionUID = -3330160816161901752L;
+
+   private final CheckpointDeclineReason checkpointDeclineReason;
+
+   public CheckpointTriggerException(String message, 
CheckpointDeclineReason checkpointDeclineReason) {
+   super(message + " Decline reason: " + checkpointDeclineReason);
--- End diff --

Not super important because it is never logged but you could include the 
enums `.message()` into the exception message.


---


[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...

2018-02-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170300254
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -534,42 +536,94 @@ public void postStop() throws Exception {
 
// 4. take a savepoint
final CompletableFuture savepointFuture = 
triggerSavepoint(
-   jobMasterConfiguration.getTmpDirectory(),
-   timeout);
+   null,
+   timeout)
+   .handleAsync(
+   (String savepointPath, Throwable throwable) -> {
+   if (throwable != null) {
+   final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+   if (strippedThrowable 
instanceof CheckpointTriggerException) {
+   final 
CheckpointTriggerException checkpointTriggerException = 
(CheckpointTriggerException) strippedThrowable;
+
+   if 
(checkpointTriggerException.getCheckpointDeclineReason() == 
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
+   return 
lastInternalSavepoint;
+   } else {
+   throw new 
CompletionException(checkpointTriggerException);
+   }
+   } else {
+   throw new 
CompletionException(strippedThrowable);
+   }
+   } else {
+   final String savepointToDispose 
= lastInternalSavepoint;
--- End diff --

I think `savepointToDispose` be `null`.


---


[GitHub] flink pull request #5560: [FLINK-8746] [flip6] Rescale partially running job...

2018-02-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5560#discussion_r170281192
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -289,7 +290,7 @@ public void invoke() throws Exception {
 
// wait forever (until canceled)
synchronized (this) {
-   while (error == null && lastCheckpointId < 
numCalls) {
--- End diff --

Are you sure this needed to be removed?


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374593#comment-16374593
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170297941
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
--- End diff --

rename to `removePropertyAndVerify()`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374575#comment-16374575
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170229924
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -89,37 +105,58 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 put(key, clazz.getName)
   }
 
+  /**
+* Adds a string under the given key.
+*/
   def putString(key: String, str: String): Unit = {
 checkNotNull(key)
 checkNotNull(str)
 put(key, str)
   }
 
+  /**
+* Adds a boolean under the given key.
+*/
   def putBoolean(key: String, b: Boolean): Unit = {
 checkNotNull(key)
 put(key, b.toString)
   }
 
+  /**
+* Adds a long under the given key.
+*/
   def putLong(key: String, l: Long): Unit = {
 checkNotNull(key)
 put(key, l.toString)
   }
 
+  /**
+* Adds an integer under the given key.
+*/
   def putInt(key: String, i: Int): Unit = {
 checkNotNull(key)
 put(key, i.toString)
   }
 
+  /**
+* Adds a character under the given key.
+*/
   def putCharacter(key: String, c: Character): Unit = {
 checkNotNull(key)
 checkNotNull(c)
 put(key, c.toString)
   }
 
+  /**
+* Adds a table schema under the given key.
+*/
   def putTableSchema(key: String, schema: TableSchema): Unit = {
 putTableSchema(key, normalizeTableSchema(schema))
   }
 
+  /**
+* Adds a table schema under the given key.
+*/
   def putTableSchema(key: String, nameAndType: Seq[(String, String)]): 
Unit = {
--- End diff --

Remove?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374581#comment-16374581
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230962
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
+  key: String,
+  propertySets: JList[JMap[String, String]])
+: Unit = {
+checkNotNull(key)
+checkNotNull(propertySets)
+putIndexedVariableProperties(key, 
propertySets.asScala.map(_.asScala.toMap))
+  }
+
   // 
--
 
+  /**
+* Returns a string value under the given key if it exists.
+*/
   def getString(key: String): Option[String] = {
 properties.get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-case Some(c) =>
-  if (c.length != 1) {
-throw new ValidationException(s"The value of $key must only 
contain one character.")
-  }
-  Some(c.charAt(0))
+  /**
+* Returns a string value under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalString(key: String): Optional[String] = 
toJava(getString(key))
 
-case None => None
+  /**
+* Returns a character value under the given key if it exists.
+*/
+  def getCharacter(key: String): Option[Character] = getString(key).map { 
c =>
+if (c.length != 1) {
+  throw new ValidationException(s"The value of $key must only contain 
one character.")
+}
+c.charAt(0)
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-case Some(b) => Some(JBoolean.parseBoolean(b))
-
-case None => None
+  /**
+* Returns a class value under the given key if it exists.
+*/
+  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
+properties.get(key).map { name =>
+  val clazz = try {
+Class.forName(
+  name,
+  true,
+  
Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]]
+  } catch {
+case e: Exception =>
+  throw new ValidationException(s"Coult not get class for key 
'$key'.", e)
--- End diff --

Add name of class?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170291034
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
 ---
@@ -18,48 +18,67 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
 import 
org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
 import org.apache.flink.types.Row
 import org.junit.Test
 
-class RowtimeTest extends DescriptorTestBase {
+import scala.collection.JavaConverters._
 
-  @Test
-  def testRowtime(): Unit = {
-val desc = Rowtime()
-  .timestampsFromField("otherField")
-  .watermarksPeriodicBounding(1000L)
-val expected = Seq(
-  "rowtime.0.version" -> "1",
-  "rowtime.0.timestamps.type" -> "from-field",
-  "rowtime.0.timestamps.from" -> "otherField",
-  "rowtime.0.watermarks.type" -> "periodic-bounding",
-  "rowtime.0.watermarks.delay" -> "1000"
-)
-verifyProperties(desc, expected)
-  }
+class RowtimeTest extends DescriptorTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWatermarkType(): Unit = {
-verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", 
"xxx")
   }
 
   @Test(expected = classOf[ValidationException])
   def testMissingWatermarkClass(): Unit = {
-verifyMissingProperty("rowtime.0.watermarks.class")
+verifyMissingProperty(descriptors().get(1), "rowtime.watermarks.class")
--- End diff --

use constant


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374594#comment-16374594
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170296912
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.ValidationException;
+
+/**
+  * Validator for {@link Json}.
+  */
+public class JsonValidator extends FormatDescriptorValidator {
+
+   public static final String FORMAT_TYPE_VALUE = "json";
+   public static final String FORMAT_SCHEMA = "format.schema";
+   public static final String FORMAT_JSON_SCHEMA = "format.json-schema";
+   public static final String FORMAT_FAIL_ON_MISSING_FIELD = 
"format.fail-on-missing-field";
+
+   @Override
+   public void validate(DescriptorProperties properties) {
+   super.validate(properties);
+   final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA);
+   final boolean hasSchemaString = 
properties.containsKey(FORMAT_JSON_SCHEMA);
+   if (hasSchema && hasSchemaString) {
+   throw new ValidationException("A definition of both a 
schema and JSON schema is not allowed.");
+   } else if (!hasSchema && !hasSchemaString) {
+   throw new ValidationException("A definition of a schema 
and JSON schema is required.");
--- End diff --

replace "and" by "or" -> "A definition of a schema or JSON schema is 
required."


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230962
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
+  key: String,
+  propertySets: JList[JMap[String, String]])
+: Unit = {
+checkNotNull(key)
+checkNotNull(propertySets)
+putIndexedVariableProperties(key, 
propertySets.asScala.map(_.asScala.toMap))
+  }
+
   // 
--
 
+  /**
+* Returns a string value under the given key if it exists.
+*/
   def getString(key: String): Option[String] = {
 properties.get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-case Some(c) =>
-  if (c.length != 1) {
-throw new ValidationException(s"The value of $key must only 
contain one character.")
-  }
-  Some(c.charAt(0))
+  /**
+* Returns a string value under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalString(key: String): Optional[String] = 
toJava(getString(key))
 
-case None => None
+  /**
+* Returns a character value under the given key if it exists.
+*/
+  def getCharacter(key: String): Option[Character] = getString(key).map { 
c =>
+if (c.length != 1) {
+  throw new ValidationException(s"The value of $key must only contain 
one character.")
+}
+c.charAt(0)
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-case Some(b) => Some(JBoolean.parseBoolean(b))
-
-case None => None
+  /**
+* Returns a class value under the given key if it exists.
+*/
+  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
+properties.get(key).map { name =>
+  val clazz = try {
+Class.forName(
+  name,
+  true,
+  
Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]]
+  } catch {
+case e: Exception =>
+  throw new ValidationException(s"Coult not get class for key 
'$key'.", e)
--- End diff --

Add name of class?


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374589#comment-16374589
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170290993
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
 ---
@@ -18,48 +18,67 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
 import 
org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
 import org.apache.flink.types.Row
 import org.junit.Test
 
-class RowtimeTest extends DescriptorTestBase {
+import scala.collection.JavaConverters._
 
-  @Test
-  def testRowtime(): Unit = {
-val desc = Rowtime()
-  .timestampsFromField("otherField")
-  .watermarksPeriodicBounding(1000L)
-val expected = Seq(
-  "rowtime.0.version" -> "1",
-  "rowtime.0.timestamps.type" -> "from-field",
-  "rowtime.0.timestamps.from" -> "otherField",
-  "rowtime.0.watermarks.type" -> "periodic-bounding",
-  "rowtime.0.watermarks.delay" -> "1000"
-)
-verifyProperties(desc, expected)
-  }
+class RowtimeTest extends DescriptorTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWatermarkType(): Unit = {
-verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", 
"xxx")
--- End diff --

use constant instead of `"rowtime.watermarks.type"`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170224845
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 ---
@@ -83,10 +84,32 @@ protected JsonRowDeserializationSchema 
getDeserializationSchema() {
 
@Override
public String explainSource() {
-   return "KafkaJSONTableSource";
+   return "KafkaJsonTableSource";
}
 
-    SETTERS FOR OPTIONAL PARAMETERS
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (!(o instanceof KafkaJsonTableSource)) {
+   return false;
+   }
+   if (!super.equals(o)) {
+   return false;
+   }
+   KafkaJsonTableSource that = (KafkaJsonTableSource) o;
+   return failOnMissingField == that.failOnMissingField &&
+   Objects.equals(jsonSchema, that.jsonSchema) &&
+   Objects.equals(fieldMapping, that.fieldMapping);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, 
failOnMissingField);
--- End diff --

`TableSchema` does not override `hashCode()`


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374586#comment-16374586
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170271213
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -32,11 +32,36 @@ import scala.collection.mutable
   */
 class Schema extends Descriptor {
 
+  private var deriveFields: Option[String] = None
+
   // maps a field name to a list of properties that describe type, origin, 
and the time attribute
   private val tableSchema = mutable.LinkedHashMap[String, 
mutable.LinkedHashMap[String, String]]()
 
   private var lastField: Option[String] = None
 
+  /**
+* Derives field names and types from a preceding connector or format. 
Additional fields that
+* are defined in this schema extend the derived fields. The derived 
fields are
+* added in an alphabetical order according to their field name.
+*/
+  def deriveFieldsAlphabetically(): Schema = {
--- End diff --

I think we should support inferring the format from the schema rather than 
the schema from the format.
This would be more aligned with how it would work in a `CREATE TABLE` 
statement and how Hive is doing it for example. We should still support to 
define the format explicitly though.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230041
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
--- End diff --

Remove Scala equivalent?


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170257053
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
+  val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized"
+  val ROWTIME_WATERMARKS_DELAY = "rowtime.watermarks.delay"
--- End diff --

`rowtime.watermarks.bounded.delay`?


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170257084
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
--- End diff --

BOUNDING -> BOUNDED


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170273135
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -41,10 +41,10 @@ trait TableSourceFactory[T] {
 *   - connector.type
 *   - format.type
 *
-* Specified versions allow the framework to provide backwards 
compatible properties in case of
--- End diff --

(not related to this change) Should we add something like a `priority` 
property to `TableSourceFactory` that determines in which order factories are 
matched. If two factories match, we would use the factory with the higher 
priority.


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170290993
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
 ---
@@ -18,48 +18,67 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
 import 
org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
 import org.apache.flink.types.Row
 import org.junit.Test
 
-class RowtimeTest extends DescriptorTestBase {
+import scala.collection.JavaConverters._
 
-  @Test
-  def testRowtime(): Unit = {
-val desc = Rowtime()
-  .timestampsFromField("otherField")
-  .watermarksPeriodicBounding(1000L)
-val expected = Seq(
-  "rowtime.0.version" -> "1",
-  "rowtime.0.timestamps.type" -> "from-field",
-  "rowtime.0.timestamps.from" -> "otherField",
-  "rowtime.0.watermarks.type" -> "periodic-bounding",
-  "rowtime.0.watermarks.delay" -> "1000"
-)
-verifyProperties(desc, expected)
-  }
+class RowtimeTest extends DescriptorTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWatermarkType(): Unit = {
-verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", 
"xxx")
--- End diff --

use constant instead of `"rowtime.watermarks.type"`


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374579#comment-16374579
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256878
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
+  val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized"
--- End diff --

`rowtime.watermarks.custom.serialized`?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374578#comment-16374578
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256073
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
--- End diff --

`rowtime.timestamps.custom.class`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256851
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
--- End diff --

`rowtime.watermarks.custom.class`?


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170229569
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -128,6 +165,13 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 )
   }
 
+  /**
+* Adds a table schema under the given key. This method is intended for 
Java code.
+*/
+  def putTableSchema(key: String, nameAndType: JList[JTuple2[String, 
String]]): Unit = {
--- End diff --

I think we should drop the Scala equivalent method. This is not a public 
API class that needs a shiny Scala interface but should be usable from Java and 
Scala.


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230912
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
+  key: String,
+  propertySets: JList[JMap[String, String]])
+: Unit = {
+checkNotNull(key)
+checkNotNull(propertySets)
+putIndexedVariableProperties(key, 
propertySets.asScala.map(_.asScala.toMap))
+  }
+
   // 
--
 
+  /**
+* Returns a string value under the given key if it exists.
+*/
   def getString(key: String): Option[String] = {
 properties.get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-case Some(c) =>
-  if (c.length != 1) {
-throw new ValidationException(s"The value of $key must only 
contain one character.")
-  }
-  Some(c.charAt(0))
+  /**
+* Returns a string value under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalString(key: String): Optional[String] = 
toJava(getString(key))
 
-case None => None
+  /**
+* Returns a character value under the given key if it exists.
+*/
+  def getCharacter(key: String): Option[Character] = getString(key).map { 
c =>
+if (c.length != 1) {
+  throw new ValidationException(s"The value of $key must only contain 
one character.")
+}
+c.charAt(0)
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-case Some(b) => Some(JBoolean.parseBoolean(b))
-
-case None => None
+  /**
+* Returns a class value under the given key if it exists.
+*/
+  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
+properties.get(key).map { name =>
+  val clazz = try {
+Class.forName(
+  name,
+  true,
+  
Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]]
+  } catch {
+case e: Exception =>
+  throw new ValidationException(s"Coult not get class for key 
'$key'.", e)
--- End diff --

typo: Could


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374574#comment-16374574
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230041
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
--- End diff --

Remove Scala equivalent?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170271213
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -32,11 +32,36 @@ import scala.collection.mutable
   */
 class Schema extends Descriptor {
 
+  private var deriveFields: Option[String] = None
+
   // maps a field name to a list of properties that describe type, origin, 
and the time attribute
   private val tableSchema = mutable.LinkedHashMap[String, 
mutable.LinkedHashMap[String, String]]()
 
   private var lastField: Option[String] = None
 
+  /**
+* Derives field names and types from a preceding connector or format. 
Additional fields that
+* are defined in this schema extend the derived fields. The derived 
fields are
+* added in an alphabetical order according to their field name.
+*/
+  def deriveFieldsAlphabetically(): Schema = {
--- End diff --

I think we should support inferring the format from the schema rather than 
the schema from the format.
This would be more aligned with how it would work in a `CREATE TABLE` 
statement and how Hive is doing it for example. We should still support to 
define the format explicitly though.


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374577#comment-16374577
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170257084
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
--- End diff --

BOUNDING -> BOUNDED


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374587#comment-16374587
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170272030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -67,14 +92,188 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
 object SchemaValidator {
 
   val SCHEMA = "schema"
-  val SCHEMA_VERSION = "schema.version"
+  val SCHEMA_PROPERTY_VERSION = "schema.property-version"
+  val SCHEMA_FIELDS = "schema.fields"
+  val SCHEMA_FIELDS_NAME = "name"
+  val SCHEMA_FIELDS_TYPE = "type"
+  val SCHEMA_FIELDS_PROCTIME = "proctime"
+  val SCHEMA_FIELDS_FROM = "from"
+  val SCHEMA_DERIVE_FIELDS = "schema.derive-fields"
+  val SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY = "alphabetically"
+  val SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY = "sequentially"
+
+  // utilities
+
+  /**
+* Derives a schema from properties and source.
+*/
+  def deriveSchema(
+  properties: DescriptorProperties,
+  sourceSchema: Option[TableSchema])
+: TableSchema = {
+
+val builder = TableSchema.builder()
+
+val schema = properties.getTableSchema(SCHEMA_FIELDS)
+
+val derivationMode = properties.getString(SCHEMA_DERIVE_FIELDS)
+
+val sourceNamesAndTypes = derivationMode match {
+  case Some(SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY) if 
sourceSchema.isDefined =>
+// sort by name
+sourceSchema.get.getColumnNames
+  .zip(sourceSchema.get.getTypes)
+  .sortBy(_._1)
+
+  case Some(SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY) if 
sourceSchema.isDefined =>
+sourceSchema.get.getColumnNames.zip(sourceSchema.get.getTypes)
+
+  case Some(_) =>
+throw new ValidationException("Derivation of fields is not 
supported from this source.")
+
+  case None =>
+Array[(String, TypeInformation[_])]()
+}
+
+// add source fields
+sourceNamesAndTypes.foreach { case (n, t) =>
+  builder.field(n, t)
+}
+
+// add schema fields
+schema.foreach { ts =>
+  val schemaNamesAndTypes = ts.getColumnNames.zip(ts.getTypes)
+  schemaNamesAndTypes.foreach { case (n, t) =>
+  // do not allow overwriting
+  if (sourceNamesAndTypes.exists(_._1 == n)) {
+throw new ValidationException(
+  "Specified schema fields must not overwrite fields derived 
from the source.")
+  }
+  builder.field(n, t)
+  }
+}
+
+builder.build()
+  }
+
+  /**
+* Derives a schema from properties and source.
+* This method is intended for Java code.
+*/
+  def deriveSchema(
+  properties: DescriptorProperties,
+  sourceSchema: Optional[TableSchema])
+: TableSchema = {
+deriveSchema(
+  properties,
+  Option(sourceSchema.orElse(null)))
+  }
+
+  /**
+* Finds the proctime attribute if defined.
+*/
+  def deriveProctimeAttribute(properties: DescriptorProperties): 
Option[String] = {
+val names = properties.getIndexedProperty(SCHEMA_FIELDS, 
SCHEMA_FIELDS_NAME)
+
+for (i <- 0 until names.size) {
+  val isProctime = 
properties.getBoolean(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_PROCTIME")
+  isProctime.foreach { isSet =>
+if (isSet) {
+  return names.get(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME")
+}
+  }
+}
+None
+  }
+
+  /**
+* Finds the proctime attribute if defined.
+* This method is intended for Java code.
+*/
+  def deriveProctimeOptional(properties: DescriptorProperties): 
Optional[String] = {
+Optional.ofNullable(deriveProctimeAttribute(properties).orNull)
+  }
+
+  /**
+* Finds the rowtime attributes if defined.
+*/
+  def deriveRowtimeAttributes(properties: DescriptorProperties)
+: util.List[RowtimeAttributeDescriptor] = {
+
+val names = properties.getIndexedProperty(SCHEMA_FIELDS, 
SCHEMA_FIELDS_NAME)
+
+var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+// check for rowtime in every field
+for (i <- 0 until names.size) {
+  RowtimeValidator
+.getRowtimeComponents(properties, s"$SCHEMA_FIELDS.$i.")
+.foreach { case (extractor, strategy) =>
+  // create descriptor
+  

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170296912
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.ValidationException;
+
+/**
+  * Validator for {@link Json}.
+  */
+public class JsonValidator extends FormatDescriptorValidator {
+
+   public static final String FORMAT_TYPE_VALUE = "json";
+   public static final String FORMAT_SCHEMA = "format.schema";
+   public static final String FORMAT_JSON_SCHEMA = "format.json-schema";
+   public static final String FORMAT_FAIL_ON_MISSING_FIELD = 
"format.fail-on-missing-field";
+
+   @Override
+   public void validate(DescriptorProperties properties) {
+   super.validate(properties);
+   final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA);
+   final boolean hasSchemaString = 
properties.containsKey(FORMAT_JSON_SCHEMA);
+   if (hasSchema && hasSchemaString) {
+   throw new ValidationException("A definition of both a 
schema and JSON schema is not allowed.");
+   } else if (!hasSchema && !hasSchemaString) {
+   throw new ValidationException("A definition of a schema 
and JSON schema is required.");
--- End diff --

replace "and" by "or" -> "A definition of a schema or JSON schema is 
required."


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374595#comment-16374595
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170298004
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
--- End diff --

rename to `addPropertyAndVerify()`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374584#comment-16374584
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170257053
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
+  val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized"
+  val ROWTIME_WATERMARKS_DELAY = "rowtime.watermarks.delay"
--- End diff --

`rowtime.watermarks.bounded.delay`?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374580#comment-16374580
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256851
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
--- End diff --

`rowtime.watermarks.custom.class`?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374592#comment-16374592
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170291034
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/RowtimeTest.scala
 ---
@@ -18,48 +18,67 @@
 
 package org.apache.flink.table.descriptors
 
+import java.util
+
 import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.descriptors.RowtimeTest.CustomAssigner
 import 
org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner
 import org.apache.flink.types.Row
 import org.junit.Test
 
-class RowtimeTest extends DescriptorTestBase {
+import scala.collection.JavaConverters._
 
-  @Test
-  def testRowtime(): Unit = {
-val desc = Rowtime()
-  .timestampsFromField("otherField")
-  .watermarksPeriodicBounding(1000L)
-val expected = Seq(
-  "rowtime.0.version" -> "1",
-  "rowtime.0.timestamps.type" -> "from-field",
-  "rowtime.0.timestamps.from" -> "otherField",
-  "rowtime.0.watermarks.type" -> "periodic-bounding",
-  "rowtime.0.watermarks.delay" -> "1000"
-)
-verifyProperties(desc, expected)
-  }
+class RowtimeTest extends DescriptorTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWatermarkType(): Unit = {
-verifyInvalidProperty("rowtime.0.watermarks.type", "xxx")
+verifyInvalidProperty(descriptors().get(0), "rowtime.watermarks.type", 
"xxx")
   }
 
   @Test(expected = classOf[ValidationException])
   def testMissingWatermarkClass(): Unit = {
-verifyMissingProperty("rowtime.0.watermarks.class")
+verifyMissingProperty(descriptors().get(1), "rowtime.watermarks.class")
--- End diff --

use constant


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374591#comment-16374591
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170273904
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
 ---
@@ -47,53 +79,18 @@ class CsvTest extends DescriptorTestBase {
   "format.fields.3.name" -> "field4",
   "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
--- End diff --

Shouldn't this fail because CSV does not support nested data?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374576#comment-16374576
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256006
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
--- End diff --

`rowtime.timestamps.from.field`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374582#comment-16374582
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170229569
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -128,6 +165,13 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 )
   }
 
+  /**
+* Adds a table schema under the given key. This method is intended for 
Java code.
+*/
+  def putTableSchema(key: String, nameAndType: JList[JTuple2[String, 
String]]): Unit = {
--- End diff --

I think we should drop the Scala equivalent method. This is not a public 
API class that needs a shiny Scala interface but should be usable from Java and 
Scala.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374585#comment-16374585
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170232160
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -246,13 +394,93 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 Some(schemaBuilder.build())
   }
 
+  /**
+* Returns a table schema under the given key if it exists.
+*/
+  def getOptionalTableSchema(key: String): Optional[TableSchema] = 
toJava(getTableSchema(key))
+
+  /**
+* Returns the type information under the given key if it exists.
+*/
+  def getType(key: String): Option[TypeInformation[_]] = {
+properties.get(key).map(TypeStringUtils.readTypeInfo)
+  }
+
+  /**
+* Returns the type information under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalType(key: String): Optional[TypeInformation[_]] = {
+toJava(getType(key))
+  }
+
+  /**
+* Returns a prefix subset of properties.
+*/
+  def getPrefix(prefixKey: String): Map[String, String] = {
+val prefix = prefixKey + '.'
+properties.filterKeys(_.startsWith(prefix)).toSeq.map{ case (k, v) =>
+  k.substring(prefix.length) -> v // remove prefix
+}.toMap
+  }
+
+  /**
+* Returns a prefix subset of properties.
+* This method is intended for Java code.
+*/
+  def getPrefixMap(prefixKey: String): JMap[String, String] = 
getPrefix(prefixKey).asJava
--- End diff --

I find the different names for methods that do the same confusing. I'd just 
remove the Scala methods.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374590#comment-16374590
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170273135
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -41,10 +41,10 @@ trait TableSourceFactory[T] {
 *   - connector.type
 *   - format.type
 *
-* Specified versions allow the framework to provide backwards 
compatible properties in case of
--- End diff --

(not related to this change) Should we add something like a `priority` 
property to `TableSourceFactory` that determines in which order factories are 
matched. If two factories match, we would use the factory with the higher 
priority.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374583#comment-16374583
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256141
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
--- End diff --

`rowtime.timestamps.custom.serialized`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374588#comment-16374588
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170274382
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
--- End diff --

Why is this important?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374572#comment-16374572
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230028
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -155,6 +199,28 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed sequence of properties (with sub-properties) under a 
common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.type = LONG, schema.fields.1.name = test2
+*
+* The arity of each propertyValue must match the arity of propertyKeys.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedFixedProperties(
--- End diff --

Remove Scala equivalent?


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374573#comment-16374573
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230912
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -178,46 +244,128 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed mapping of properties under a common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.name = test2
+*
+* The arity of the propertySets can differ.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedVariableProperties(
+  key: String,
+  propertySets: JList[JMap[String, String]])
+: Unit = {
+checkNotNull(key)
+checkNotNull(propertySets)
+putIndexedVariableProperties(key, 
propertySets.asScala.map(_.asScala.toMap))
+  }
+
   // 
--
 
+  /**
+* Returns a string value under the given key if it exists.
+*/
   def getString(key: String): Option[String] = {
 properties.get(key)
   }
 
-  def getCharacter(key: String): Option[Character] = getString(key) match {
-case Some(c) =>
-  if (c.length != 1) {
-throw new ValidationException(s"The value of $key must only 
contain one character.")
-  }
-  Some(c.charAt(0))
+  /**
+* Returns a string value under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalString(key: String): Optional[String] = 
toJava(getString(key))
 
-case None => None
+  /**
+* Returns a character value under the given key if it exists.
+*/
+  def getCharacter(key: String): Option[Character] = getString(key).map { 
c =>
+if (c.length != 1) {
+  throw new ValidationException(s"The value of $key must only contain 
one character.")
+}
+c.charAt(0)
   }
 
-  def getBoolean(key: String): Option[Boolean] = getString(key) match {
-case Some(b) => Some(JBoolean.parseBoolean(b))
-
-case None => None
+  /**
+* Returns a class value under the given key if it exists.
+*/
+  def getClass[T](key: String, superClass: Class[T]): Option[Class[T]] = {
+properties.get(key).map { name =>
+  val clazz = try {
+Class.forName(
+  name,
+  true,
+  
Thread.currentThread().getContextClassLoader).asInstanceOf[Class[T]]
+  } catch {
+case e: Exception =>
+  throw new ValidationException(s"Coult not get class for key 
'$key'.", e)
--- End diff --

typo: Could


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374571#comment-16374571
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170224845
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 ---
@@ -83,10 +84,32 @@ protected JsonRowDeserializationSchema 
getDeserializationSchema() {
 
@Override
public String explainSource() {
-   return "KafkaJSONTableSource";
+   return "KafkaJsonTableSource";
}
 
-    SETTERS FOR OPTIONAL PARAMETERS
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (!(o instanceof KafkaJsonTableSource)) {
+   return false;
+   }
+   if (!super.equals(o)) {
+   return false;
+   }
+   KafkaJsonTableSource that = (KafkaJsonTableSource) o;
+   return failOnMissingField == that.failOnMissingField &&
+   Objects.equals(jsonSchema, that.jsonSchema) &&
+   Objects.equals(fieldMapping, that.fieldMapping);
+   }
+
+   @Override
+   public int hashCode() {
+   return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, 
failOnMissingField);
--- End diff --

`TableSchema` does not override `hashCode()`


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256878
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
+
+  val ROWTIME_WATERMARKS_TYPE = "rowtime.watermarks.type"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = 
"periodic-ascending"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_WATERMARKS_CLASS = "rowtime.watermarks.class"
+  val ROWTIME_WATERMARKS_SERIALIZED = "rowtime.watermarks.serialized"
--- End diff --

`rowtime.watermarks.custom.serialized`?


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170230028
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -155,6 +199,28 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 }
   }
 
+  /**
+* Adds an indexed sequence of properties (with sub-properties) under a 
common key.
+*
+* For example:
+*
+* schema.fields.0.type = INT, schema.fields.0.name = test
+* schema.fields.1.type = LONG, schema.fields.1.name = test2
+*
+* The arity of each propertyValue must match the arity of propertyKeys.
+*
+* This method is intended for Java code.
+*/
+  def putIndexedFixedProperties(
--- End diff --

Remove Scala equivalent?


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170273904
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/CsvTest.scala
 ---
@@ -47,53 +79,18 @@ class CsvTest extends DescriptorTestBase {
   "format.fields.3.name" -> "field4",
   "format.fields.3.type" -> "ROW(test INT, row VARCHAR)",
--- End diff --

Shouldn't this fail because CSV does not support nested data?


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170274382
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
--- End diff --

Why is this important?


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170298004
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
--- End diff --

rename to `addPropertyAndVerify()`


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170232160
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -246,13 +394,93 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 Some(schemaBuilder.build())
   }
 
+  /**
+* Returns a table schema under the given key if it exists.
+*/
+  def getOptionalTableSchema(key: String): Optional[TableSchema] = 
toJava(getTableSchema(key))
+
+  /**
+* Returns the type information under the given key if it exists.
+*/
+  def getType(key: String): Option[TypeInformation[_]] = {
+properties.get(key).map(TypeStringUtils.readTypeInfo)
+  }
+
+  /**
+* Returns the type information under the given key if it exists.
+* This method is intended for Java code.
+*/
+  def getOptionalType(key: String): Optional[TypeInformation[_]] = {
+toJava(getType(key))
+  }
+
+  /**
+* Returns a prefix subset of properties.
+*/
+  def getPrefix(prefixKey: String): Map[String, String] = {
+val prefix = prefixKey + '.'
+properties.filterKeys(_.startsWith(prefix)).toSeq.map{ case (k, v) =>
+  k.substring(prefix.length) -> v // remove prefix
+}.toMap
+  }
+
+  /**
+* Returns a prefix subset of properties.
+* This method is intended for Java code.
+*/
+  def getPrefixMap(prefixKey: String): JMap[String, String] = 
getPrefix(prefixKey).asJava
--- End diff --

I find the different names for methods that do the same confusing. I'd just 
remove the Scala methods.


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256073
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
--- End diff --

`rowtime.timestamps.custom.class`


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170272030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 ---
@@ -67,14 +92,188 @@ class SchemaValidator(isStreamEnvironment: Boolean = 
true) extends DescriptorVal
 object SchemaValidator {
 
   val SCHEMA = "schema"
-  val SCHEMA_VERSION = "schema.version"
+  val SCHEMA_PROPERTY_VERSION = "schema.property-version"
+  val SCHEMA_FIELDS = "schema.fields"
+  val SCHEMA_FIELDS_NAME = "name"
+  val SCHEMA_FIELDS_TYPE = "type"
+  val SCHEMA_FIELDS_PROCTIME = "proctime"
+  val SCHEMA_FIELDS_FROM = "from"
+  val SCHEMA_DERIVE_FIELDS = "schema.derive-fields"
+  val SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY = "alphabetically"
+  val SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY = "sequentially"
+
+  // utilities
+
+  /**
+* Derives a schema from properties and source.
+*/
+  def deriveSchema(
+  properties: DescriptorProperties,
+  sourceSchema: Option[TableSchema])
+: TableSchema = {
+
+val builder = TableSchema.builder()
+
+val schema = properties.getTableSchema(SCHEMA_FIELDS)
+
+val derivationMode = properties.getString(SCHEMA_DERIVE_FIELDS)
+
+val sourceNamesAndTypes = derivationMode match {
+  case Some(SCHEMA_DERIVE_FIELDS_VALUE_ALPHABETICALLY) if 
sourceSchema.isDefined =>
+// sort by name
+sourceSchema.get.getColumnNames
+  .zip(sourceSchema.get.getTypes)
+  .sortBy(_._1)
+
+  case Some(SCHEMA_DERIVE_FIELDS_VALUE_SEQUENTIALLY) if 
sourceSchema.isDefined =>
+sourceSchema.get.getColumnNames.zip(sourceSchema.get.getTypes)
+
+  case Some(_) =>
+throw new ValidationException("Derivation of fields is not 
supported from this source.")
+
+  case None =>
+Array[(String, TypeInformation[_])]()
+}
+
+// add source fields
+sourceNamesAndTypes.foreach { case (n, t) =>
+  builder.field(n, t)
+}
+
+// add schema fields
+schema.foreach { ts =>
+  val schemaNamesAndTypes = ts.getColumnNames.zip(ts.getTypes)
+  schemaNamesAndTypes.foreach { case (n, t) =>
+  // do not allow overwriting
+  if (sourceNamesAndTypes.exists(_._1 == n)) {
+throw new ValidationException(
+  "Specified schema fields must not overwrite fields derived 
from the source.")
+  }
+  builder.field(n, t)
+  }
+}
+
+builder.build()
+  }
+
+  /**
+* Derives a schema from properties and source.
+* This method is intended for Java code.
+*/
+  def deriveSchema(
+  properties: DescriptorProperties,
+  sourceSchema: Optional[TableSchema])
+: TableSchema = {
+deriveSchema(
+  properties,
+  Option(sourceSchema.orElse(null)))
+  }
+
+  /**
+* Finds the proctime attribute if defined.
+*/
+  def deriveProctimeAttribute(properties: DescriptorProperties): 
Option[String] = {
+val names = properties.getIndexedProperty(SCHEMA_FIELDS, 
SCHEMA_FIELDS_NAME)
+
+for (i <- 0 until names.size) {
+  val isProctime = 
properties.getBoolean(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_PROCTIME")
+  isProctime.foreach { isSet =>
+if (isSet) {
+  return names.get(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME")
+}
+  }
+}
+None
+  }
+
+  /**
+* Finds the proctime attribute if defined.
+* This method is intended for Java code.
+*/
+  def deriveProctimeOptional(properties: DescriptorProperties): 
Optional[String] = {
+Optional.ofNullable(deriveProctimeAttribute(properties).orNull)
+  }
+
+  /**
+* Finds the rowtime attributes if defined.
+*/
+  def deriveRowtimeAttributes(properties: DescriptorProperties)
+: util.List[RowtimeAttributeDescriptor] = {
+
+val names = properties.getIndexedProperty(SCHEMA_FIELDS, 
SCHEMA_FIELDS_NAME)
+
+var attributes = new mutable.ArrayBuffer[RowtimeAttributeDescriptor]()
+
+// check for rowtime in every field
+for (i <- 0 until names.size) {
+  RowtimeValidator
+.getRowtimeComponents(properties, s"$SCHEMA_FIELDS.$i.")
+.foreach { case (extractor, strategy) =>
+  // create descriptor
+  attributes += new RowtimeAttributeDescriptor(
+
properties.getString(s"$SCHEMA_FIELDS.$i.$SCHEMA_FIELDS_NAME").get,
+extractor,
+strategy)
+}
+}
+
+

[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170297941
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorTestBase.scala
 ---
@@ -18,37 +18,85 @@
 
 package org.apache.flink.table.descriptors
 
+import org.apache.flink.util.Preconditions
 import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.JavaConverters._
 
 abstract class DescriptorTestBase {
 
   /**
-* Returns a valid descriptor.
+* Returns a set of valid descriptors.
+* This method is implemented in both Scala and Java.
+*/
+  def descriptors(): java.util.List[Descriptor]
+
+  /**
+* Returns a set of properties for each valid descriptor.
+* This code is implemented in both Scala and Java.
 */
-  def descriptor(): Descriptor
+  def properties(): java.util.List[java.util.Map[String, String]]
 
   /**
-* Returns a validator that can validate this descriptor.
+* Returns a validator that can validate all valid descriptors.
 */
   def validator(): DescriptorValidator
 
-  def verifyProperties(descriptor: Descriptor, expected: Seq[(String, 
String)]): Unit = {
+  @Test
+  def testValidation(): Unit = {
+val d = descriptors().asScala
+val p = properties().asScala
+
+Preconditions.checkArgument(d.length == p.length)
+
+d.zip(p).foreach { case (desc, props) =>
+  verifyProperties(desc, props.asScala.toMap)
+}
+  }
+
+  def verifyProperties(descriptor: Descriptor, expected: Map[String, 
String]): Unit = {
 val normProps = new DescriptorProperties
 descriptor.addProperties(normProps)
-assertEquals(expected.toMap, normProps.asMap)
+assertEquals(expected, normProps.asScalaMap)
   }
 
-  def verifyInvalidProperty(property: String, invalidValue: String): Unit 
= {
+  def verifyInvalidProperty(
+  descriptor: Descriptor,
+  property: String,
+  invalidValue: String): Unit = {
 val properties = new DescriptorProperties
-descriptor().addProperties(properties)
+descriptor.addProperties(properties)
 properties.unsafePut(property, invalidValue)
 validator().validate(properties)
   }
 
-  def verifyMissingProperty(removeProperty: String): Unit = {
+  def verifyMissingProperty(descriptor: Descriptor, removeProperty: 
String): Unit = {
--- End diff --

rename to `removePropertyAndVerify()`


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170229924
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/DescriptorProperties.scala
 ---
@@ -89,37 +105,58 @@ class DescriptorProperties(normalizeKeys: Boolean = 
true) {
 put(key, clazz.getName)
   }
 
+  /**
+* Adds a string under the given key.
+*/
   def putString(key: String, str: String): Unit = {
 checkNotNull(key)
 checkNotNull(str)
 put(key, str)
   }
 
+  /**
+* Adds a boolean under the given key.
+*/
   def putBoolean(key: String, b: Boolean): Unit = {
 checkNotNull(key)
 put(key, b.toString)
   }
 
+  /**
+* Adds a long under the given key.
+*/
   def putLong(key: String, l: Long): Unit = {
 checkNotNull(key)
 put(key, l.toString)
   }
 
+  /**
+* Adds an integer under the given key.
+*/
   def putInt(key: String, i: Int): Unit = {
 checkNotNull(key)
 put(key, i.toString)
   }
 
+  /**
+* Adds a character under the given key.
+*/
   def putCharacter(key: String, c: Character): Unit = {
 checkNotNull(key)
 checkNotNull(c)
 put(key, c.toString)
   }
 
+  /**
+* Adds a table schema under the given key.
+*/
   def putTableSchema(key: String, schema: TableSchema): Unit = {
 putTableSchema(key, normalizeTableSchema(schema))
   }
 
+  /**
+* Adds a table schema under the given key.
+*/
   def putTableSchema(key: String, nameAndType: Seq[(String, String)]): 
Unit = {
--- End diff --

Remove?


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256006
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
--- End diff --

`rowtime.timestamps.from.field`


---


[GitHub] flink pull request #5564: [FLINK-8538] [table] Add a Kafka table source fact...

2018-02-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5564#discussion_r170256141
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/RowtimeValidator.scala
 ---
@@ -77,58 +73,113 @@ class RowtimeValidator(val prefix: String = "") 
extends DescriptorValidator {
 object RowtimeValidator {
 
   val ROWTIME = "rowtime"
-
-  // per rowtime properties
-
-  val ROWTIME_VERSION = "version"
-  val TIMESTAMPS_TYPE = "timestamps.type"
-  val TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
-  val TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
-  val TIMESTAMPS_FROM = "timestamps.from"
-  val TIMESTAMPS_CLASS = "timestamps.class"
-  val TIMESTAMPS_SERIALIZED = "timestamps.serialized"
-
-  val WATERMARKS_TYPE = "watermarks.type"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_ASCENDING = "periodic-ascending"
-  val WATERMARKS_TYPE_VALUE_PERIODIC_BOUNDING = "periodic-bounding"
-  val WATERMARKS_TYPE_VALUE_FROM_SOURCE = "from-source"
-  val WATERMARKS_TYPE_VALUE_CUSTOM = "custom"
-  val WATERMARKS_CLASS = "watermarks.class"
-  val WATERMARKS_SERIALIZED = "watermarks.serialized"
-  val WATERMARKS_DELAY = "watermarks.delay"
+  val ROWTIME_TIMESTAMPS_TYPE = "rowtime.timestamps.type"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD = "from-field"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_SOURCE = "from-source"
+  val ROWTIME_TIMESTAMPS_TYPE_VALUE_CUSTOM = "custom"
+  val ROWTIME_TIMESTAMPS_FROM = "rowtime.timestamps.from"
+  val ROWTIME_TIMESTAMPS_CLASS = "rowtime.timestamps.class"
+  val ROWTIME_TIMESTAMPS_SERIALIZED = "rowtime.timestamps.serialized"
--- End diff --

`rowtime.timestamps.custom.serialized`


---


[jira] [Updated] (FLINK-8737) Creating a union of UnionGate instances will fail with UnsupportedOperationException when retrieving buffers

2018-02-23 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8737:
---
Priority: Blocker  (was: Major)

> Creating a union of UnionGate instances will fail with 
> UnsupportedOperationException when retrieving buffers
> 
>
> Key: FLINK-8737
> URL: https://issues.apache.org/jira/browse/FLINK-8737
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> FLINK-8589 introduced a new polling method but did not implement 
> {{UnionInputGate#pollNextBufferOrEvent()}}. This prevents UnionGate instances 
> from containing a UnionGate instance which is explicitly allowed by its 
> documentation and interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8736) Memory segment offsets for slices of slices are wrong

2018-02-23 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8736:
---
Priority: Critical  (was: Major)

> Memory segment offsets for slices of slices are wrong
> -
>
> Key: FLINK-8736
> URL: https://issues.apache.org/jira/browse/FLINK-8736
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
> Fix For: 1.5.0
>
>
> FLINK-8588 introduced memory segment offsets but the offsets of slices of 
> slices do not account for their parent's slice offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8755) SpilledSubpartitionView wrongly relys on the backlog for determining whether more data is available

2018-02-23 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8755:
---
Priority: Blocker  (was: Major)

> SpilledSubpartitionView wrongly relys on the backlog for determining whether 
> more data is available
> ---
>
> Key: FLINK-8755
> URL: https://issues.apache.org/jira/browse/FLINK-8755
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {code}
> public BufferAndBacklog getNextBuffer() throws IOException, 
> InterruptedException {
> //...
> int newBacklog = parent.decreaseBuffersInBacklog(current);
> return new BufferAndBacklog(current, newBacklog > 0, newBacklog, 
> nextBufferIsEvent);
> {code}
> relies on the backlog to signal further data availability. However, if there 
> are only events left in the buffer queue, their buffers are not included in 
> the backlog counting and therefore, {{isMoreAvailable}} will be wrongly 
> {{false}} here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8751) Canceling a job results in a InterruptedException in the TM

2018-02-23 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374551#comment-16374551
 ] 

Stephan Ewen commented on FLINK-8751:
-

Yes, that makes sense.

The reason here is that when Flink cancels the user code, it interrupts the 
task thread (to pull the code out of blocking operations). When the thread has 
the interrupted flag, and successive wait that is part of graceful shutdown 
immediately throws an InterruptedException.

I think the right fix is to clear the interrupted flag at the beginning of the 
graceful shutdown code (the beginning of the finally code).

> Canceling a job results in a InterruptedException in the TM
> ---
>
> Key: FLINK-8751
> URL: https://issues.apache.org/jira/browse/FLINK-8751
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.4.1
>Reporter: Elias Levy
>Priority: Major
>
> Canceling a job results in the following exception reported by the TM: 
> {code:java}
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could 
> not shut down timer service java.lang.InterruptedException 
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
>  Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
> Source) 
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
>   at java.lang.Thread.run(Unknown Source){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8751) Canceling a job results in a InterruptedException in the TM

2018-02-23 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374525#comment-16374525
 ] 

Elias Levy commented on FLINK-8751:
---

Apologies.  I meant TM.  It was a long day.  I've amended the issue.

> Canceling a job results in a InterruptedException in the TM
> ---
>
> Key: FLINK-8751
> URL: https://issues.apache.org/jira/browse/FLINK-8751
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.4.1
>Reporter: Elias Levy
>Priority: Major
>
> Canceling a job results in the following exception reported by the TM: 
> {code:java}
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could 
> not shut down timer service java.lang.InterruptedException 
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
>  Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
> Source) 
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
>   at java.lang.Thread.run(Unknown Source){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8751) Canceling a job results in a InterruptedException in the TM

2018-02-23 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-8751:
--
Description: 
Canceling a job results in the following exception reported by the TM: 
{code:java}
ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not 
shut down timer service java.lang.InterruptedException 
  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
 Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
Source) 
  at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
 
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
  at java.lang.Thread.run(Unknown Source){code}
 

  was:
Canceling a job results in the following exception reported by the JM: 
{code:java}
ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not 
shut down timer service java.lang.InterruptedException 
  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
 Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
Source) 
  at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
 
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
  at java.lang.Thread.run(Unknown Source){code}
 

Component/s: (was: JobManager)
 TaskManager
Summary: Canceling a job results in a InterruptedException in the TM  
(was: Canceling a job results in a InterruptedException in the JM)

> Canceling a job results in a InterruptedException in the TM
> ---
>
> Key: FLINK-8751
> URL: https://issues.apache.org/jira/browse/FLINK-8751
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.4.1
>Reporter: Elias Levy
>Priority: Major
>
> Canceling a job results in the following exception reported by the TM: 
> {code:java}
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could 
> not shut down timer service java.lang.InterruptedException 
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
>  Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
> Source) 
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
>   at java.lang.Thread.run(Unknown Source){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8760) Correctly set `moreAvailable` flag in SingleInputGate and UnionInputGate

2018-02-23 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-8760:
--
Priority: Blocker  (was: Major)

> Correctly set `moreAvailable` flag in SingleInputGate and UnionInputGate
> 
>
> Key: FLINK-8760
> URL: https://issues.apache.org/jira/browse/FLINK-8760
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8694) Make notifyDataAvailable call reliable

2018-02-23 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-8694:
--
Priority: Blocker  (was: Major)

> Make notifyDataAvailable call reliable
> --
>
> Key: FLINK-8694
> URL: https://issues.apache.org/jira/browse/FLINK-8694
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>
> After FLINK-8591 
> org.apache.flink.runtime.io.network.netty.SequenceNumberingViewReader#notifyDataAvailable
>  (and the same for Credit base flow control) due to race condition can be 
> sometimes ignored. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8694) Make notifyDataAvailable call reliable

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374496#comment-16374496
 ] 

ASF GitHub Bot commented on FLINK-8694:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5572

[FLINK-8694][runtime] Fix notifyDataAvailable race condition

This fixes two bugs in network stack:
https://issues.apache.org/jira/browse/FLINK-8760
https://issues.apache.org/jira/browse/FLINK-8694

## Brief change log

Please check individual commit messages.

## Verifying this change

This PR adds new tests covering the previously bugged cases.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8694-proper-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5572


commit 388d16118763dddff7d4c3593572169ad3e65c0d
Author: Piotr Nowojski 
Date:   2018-02-23T10:37:37Z

[hotfix][tests] Deduplicate code in SingleInputGateTest

commit e22a44b24ab1e9f02c236440f899a1f4dfdfc873
Author: Piotr Nowojski 
Date:   2018-02-23T11:11:14Z

[hotfix][runtime] Remove duplicated check

commit 5c16e565c4a7f0ffdaec888696d98e3c2c221d99
Author: Piotr Nowojski 
Date:   2018-02-23T10:20:21Z

[FLINK-8760][runtime] Correctly propagate moreAvailable flag through 
SingleInputGate

Previously if we SingleInputGate was re-eqnqueuing an input channel, 
isMoreAvailable
might incorrectly return false. This might caused some dead locks.

commit a451006fd2e38e478ef745fd9de0ebc5fb2fd5c2
Author: Piotr Nowojski 
Date:   2018-02-23T10:27:54Z

[hotfixu][tests] Do not hide original exception in 
SuccessAfterNetworkBuffersFailureITCase

commit e70cd04424f0f92b9d5127e7c4a351d3823d20bd
Author: Piotr Nowojski 
Date:   2018-02-23T10:28:20Z

[FLINK-8694][runtime] Fix notifyDataAvailable race condition

Before there was a race condition that might resulted in igonoring some 
notifyDataAvailable calls.
This fixes the problem by moving buffersAvailable handling to Supartitions 
and adds stress test
for flushAlways (without this fix this test is dead locking).




> Make notifyDataAvailable call reliable
> --
>
> Key: FLINK-8694
> URL: https://issues.apache.org/jira/browse/FLINK-8694
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>
> After FLINK-8591 
> org.apache.flink.runtime.io.network.netty.SequenceNumberingViewReader#notifyDataAvailable
>  (and the same for Credit base flow control) due to race condition can be 
> sometimes ignored. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

2018-02-23 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5572

[FLINK-8694][runtime] Fix notifyDataAvailable race condition

This fixes two bugs in network stack:
https://issues.apache.org/jira/browse/FLINK-8760
https://issues.apache.org/jira/browse/FLINK-8694

## Brief change log

Please check individual commit messages.

## Verifying this change

This PR adds new tests covering the previously bugged cases.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8694-proper-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5572


commit 388d16118763dddff7d4c3593572169ad3e65c0d
Author: Piotr Nowojski 
Date:   2018-02-23T10:37:37Z

[hotfix][tests] Deduplicate code in SingleInputGateTest

commit e22a44b24ab1e9f02c236440f899a1f4dfdfc873
Author: Piotr Nowojski 
Date:   2018-02-23T11:11:14Z

[hotfix][runtime] Remove duplicated check

commit 5c16e565c4a7f0ffdaec888696d98e3c2c221d99
Author: Piotr Nowojski 
Date:   2018-02-23T10:20:21Z

[FLINK-8760][runtime] Correctly propagate moreAvailable flag through 
SingleInputGate

Previously if we SingleInputGate was re-eqnqueuing an input channel, 
isMoreAvailable
might incorrectly return false. This might caused some dead locks.

commit a451006fd2e38e478ef745fd9de0ebc5fb2fd5c2
Author: Piotr Nowojski 
Date:   2018-02-23T10:27:54Z

[hotfixu][tests] Do not hide original exception in 
SuccessAfterNetworkBuffersFailureITCase

commit e70cd04424f0f92b9d5127e7c4a351d3823d20bd
Author: Piotr Nowojski 
Date:   2018-02-23T10:28:20Z

[FLINK-8694][runtime] Fix notifyDataAvailable race condition

Before there was a race condition that might resulted in igonoring some 
notifyDataAvailable calls.
This fixes the problem by moving buffersAvailable handling to Supartitions 
and adds stress test
for flushAlways (without this fix this test is dead locking).




---


[jira] [Updated] (FLINK-8759) Bump Netty to 4.0.56

2018-02-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8759:
-
Affects Version/s: 1.5.0

> Bump Netty to 4.0.56
> 
>
> Key: FLINK-8759
> URL: https://issues.apache.org/jira/browse/FLINK-8759
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.5.0
>
>
> For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd 
> like to bump the version (and stay within the 4.0 series for now). The 
> problem we faced in the past should not be relevant for credit-based flow 
> control anymore and can be worked around (for the fallback code path) by 
> restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying 
> contents to new buffers instead of slicing the existing one (please refer to 
> FLINK-7428 for the inverse direction).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8759) Bump Netty to 4.0.56

2018-02-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8759:
-
Priority: Blocker  (was: Major)

> Bump Netty to 4.0.56
> 
>
> Key: FLINK-8759
> URL: https://issues.apache.org/jira/browse/FLINK-8759
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd 
> like to bump the version (and stay within the 4.0 series for now). The 
> problem we faced in the past should not be relevant for credit-based flow 
> control anymore and can be worked around (for the fallback code path) by 
> restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying 
> contents to new buffers instead of slicing the existing one (please refer to 
> FLINK-7428 for the inverse direction).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8759) Bump Netty to 4.0.56

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374480#comment-16374480
 ] 

ASF GitHub Bot commented on FLINK-8759:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5571

[FLINK-8759][network] preparations for the update of netty to version 4.0.56

## What is the purpose of the change

Based on the changes from #5570, this PR prepares a netty version bump by 
circumventing a bug we had with the change from Netty 4.0.27 to 4.0.28 with the 
old non-credit based flow control paths:

Versions >= 4.0.28 contain an improvement by Netty, which slices a Netty 
buffer instead of doing a memory copy 
(https://github.com/netty/netty/issues/3704) in the 
`LengthFieldBasedFrameDecoder`. In some situations, this interacts badly with 
our Netty pipeline leading to `OutOfMemory` errors.

With credit-based flow control this problem should not exist anymore which 
is why we can use the original netty code there.

## Brief change log

- override `LengthFieldBasedFrameDecoder#extractFrame()` and implement two 
different code paths depending on whether credit based flow control is on or not

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no** (only 
per buffer)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **JavaDocs**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8759

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5571.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5571


commit 272899e01c2836a2a5b47958db7d9d9f6cbf471d
Author: Nico Kruber 
Date:   2018-02-23T12:56:29Z

[FLINK-8768][network] Let NettyMessageDecoder inherit from 
LengthFieldBasedFrameDecoder

This replaces one additional step from the pipeline and does not only remove
overhead there but also allows use to override the #extractFrame() method to
restore the old Netty 4.0.27 behaviour for non-credit based code paths which
had a bug with Netty >= 4.0.28 there (see FLINK-8759).

commit 11f673b415bce310cc2195fb6778051c095083fa
Author: Nico Kruber 
Date:   2018-02-23T13:06:00Z

[FLINK-8759][network] preparations for the update of netty to version 4.0.56




> Bump Netty to 4.0.56
> 
>
> Key: FLINK-8759
> URL: https://issues.apache.org/jira/browse/FLINK-8759
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.5.0
>
>
> For a bug in Netty's shutdown sequence and overall improvements in Netty, I'd 
> like to bump the version (and stay within the 4.0 series for now). The 
> problem we faced in the past should not be relevant for credit-based flow 
> control anymore and can be worked around (for the fallback code path) by 
> restoring {{LengthFieldBasedFrameDecoder}}'s old behaviour of copying 
> contents to new buffers instead of slicing the existing one (please refer to 
> FLINK-7428 for the inverse direction).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5571: [FLINK-8759][network] preparations for the update ...

2018-02-23 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5571

[FLINK-8759][network] preparations for the update of netty to version 4.0.56

## What is the purpose of the change

Based on the changes from #5570, this PR prepares a netty version bump by 
circumventing a bug we had with the change from Netty 4.0.27 to 4.0.28 with the 
old non-credit based flow control paths:

Versions >= 4.0.28 contain an improvement by Netty, which slices a Netty 
buffer instead of doing a memory copy 
(https://github.com/netty/netty/issues/3704) in the 
`LengthFieldBasedFrameDecoder`. In some situations, this interacts badly with 
our Netty pipeline leading to `OutOfMemory` errors.

With credit-based flow control this problem should not exist anymore which 
is why we can use the original netty code there.

## Brief change log

- override `LengthFieldBasedFrameDecoder#extractFrame()` and implement two 
different code paths depending on whether credit based flow control is on or not

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no** (only 
per buffer)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **JavaDocs**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8759

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5571.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5571


commit 272899e01c2836a2a5b47958db7d9d9f6cbf471d
Author: Nico Kruber 
Date:   2018-02-23T12:56:29Z

[FLINK-8768][network] Let NettyMessageDecoder inherit from 
LengthFieldBasedFrameDecoder

This replaces one additional step from the pipeline and does not only remove
overhead there but also allows use to override the #extractFrame() method to
restore the old Netty 4.0.27 behaviour for non-credit based code paths which
had a bug with Netty >= 4.0.28 there (see FLINK-8759).

commit 11f673b415bce310cc2195fb6778051c095083fa
Author: Nico Kruber 
Date:   2018-02-23T13:06:00Z

[FLINK-8759][network] preparations for the update of netty to version 4.0.56




---


[jira] [Commented] (FLINK-8725) Separate NFA-state from NFA in CEP library

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374474#comment-16374474
 ] 

ASF GitHub Bot commented on FLINK-8725:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5559
  
@aljoscha I will have a look today.


> Separate NFA-state from NFA in CEP library
> --
>
> Key: FLINK-8725
> URL: https://issues.apache.org/jira/browse/FLINK-8725
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>
> The CEP library currently serialises the static parts of the NFA in the state 
> for each key. This is wasteful, because that part is static and problematic 
> because the static part can contain user code in the form of filter functions.
>  
> We should only serialise the dynamic state of the NFA (current states, seen 
> elements).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5559: [FLINK-8725] Separate state from NFA in CEP library

2018-02-23 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5559
  
@aljoscha I will have a look today.


---


[jira] [Commented] (FLINK-8725) Separate NFA-state from NFA in CEP library

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374471#comment-16374471
 ] 

ASF GitHub Bot commented on FLINK-8725:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5559
  
R: @kl0u 


> Separate NFA-state from NFA in CEP library
> --
>
> Key: FLINK-8725
> URL: https://issues.apache.org/jira/browse/FLINK-8725
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>
> The CEP library currently serialises the static parts of the NFA in the state 
> for each key. This is wasteful, because that part is static and problematic 
> because the static part can contain user code in the form of filter functions.
>  
> We should only serialise the dynamic state of the NFA (current states, seen 
> elements).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5559: [FLINK-8725] Separate state from NFA in CEP library

2018-02-23 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5559
  
R: @kl0u 


---


[jira] [Commented] (FLINK-8768) Change NettyMessageDecoder to inherit from LengthFieldBasedFrameDecoder

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374464#comment-16374464
 ] 

ASF GitHub Bot commented on FLINK-8768:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5570

[FLINK-8768][network] Let NettyMessageDecoder inherit from 
LengthFieldBasedFrameDecoder

## What is the purpose of the change

Instead of being two steps in the channel pipeline, `NettyMessageDecoder` 
could derive from `LengthFieldBasedFrameDecoder` to reduce overhead and give us 
more control over the protocol.

As a first step, we will use this to override the `#extractFrame()` method 
to restore the old Netty 4.0.27 behaviour for non-credit based code paths which 
had a bug with Netty >= 4.0.28 (see FLINK-8759).

## Brief change log

- make `NettyMessageDecoder` inherit from `LengthFieldBasedFrameDecoder` 
(beware that this changes the decoder from a `MessageToMessageDecoder` to a 
`ByteToMessageDecoder` with different cleanup invariants!)

## Verifying this change

This change is already covered by existing tests, such as 
`NettyMessageSerializationTest` or other network tests using the 
encoding/decoding pipeline.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no** (only 
per buffer)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8768

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5570


commit 272899e01c2836a2a5b47958db7d9d9f6cbf471d
Author: Nico Kruber 
Date:   2018-02-23T12:56:29Z

[FLINK-8768][network] Let NettyMessageDecoder inherit from 
LengthFieldBasedFrameDecoder

This replaces one additional step from the pipeline and does not only remove
overhead there but also allows use to override the #extractFrame() method to
restore the old Netty 4.0.27 behaviour for non-credit based code paths which
had a bug with Netty >= 4.0.28 there (see FLINK-8759).




> Change NettyMessageDecoder to inherit from LengthFieldBasedFrameDecoder
> ---
>
> Key: FLINK-8768
> URL: https://issues.apache.org/jira/browse/FLINK-8768
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.5.0
>
>
> Let {{NettyMessageDecoder}} inherit from {{LengthFieldBasedFrameDecoder}} 
> instead of being an additional step in the pipeline. This does not only 
> remove overhead in the pipeline itself but also allows use to override the 
> {{#extractFrame()}} method to restore the old Netty 4.0.27 behaviour for 
> non-credit based code paths which had a bug with Netty >= 4.0.28 there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >