[jira] [Updated] (FLINK-10674) DistinctAccumulator.remove lead to NPE

2018-11-09 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-10674:
-
Component/s: (was: flink-contrib)
 Table API & SQL

> DistinctAccumulator.remove lead to NPE
> --
>
> Key: FLINK-10674
> URL: https://issues.apache.org/jira/browse/FLINK-10674
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.1
> Environment: Flink 1.6.0
>Reporter: ambition
>Assignee: winifredtang
>Priority: Minor
> Attachments: image-2018-10-25-14-46-03-373.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
> lower(trim(os_type)) as os_type, 
> count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
> !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
> true
>   }else{
> val currentCnt = distinctValueMap.get(params)
> // 
> if (currentCnt == null || currentCnt == 1) {
>   distinctValueMap.remove(params)
>   true
> } else {
>   var value = currentCnt - 1L
>   if(value < 0){
> value = 1
>   }
>   distinctValueMap.put(params, value)
>   false
> }
>   }
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol closed pull request #7055: [FLINK-10823] Add missing scala suffixes

2018-11-09 Thread GitBox
zentol closed pull request #7055: [FLINK-10823] Add missing scala suffixes
URL: https://github.com/apache/flink/pull/7055
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 04efdef26fc..e482df2e795 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -674,7 +674,7 @@ metrics.reporter.grph.protocol: TCP
 
 ### Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
 
-In order to use this reporter you must copy 
`/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder
+In order to use this reporter you must copy 
`/opt/flink-metrics-prometheus{{site.scala_version_suffix}}-{{site.version}}.jar`
 into the `/lib` folder
 of your Flink distribution.
 
 Parameters:
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index e6d80eabb64..36475dfcb51 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -187,7 +187,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml 
b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 70ec8266417..0203c0ec625 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -196,7 +196,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index f386cba1a08..b3a65931869 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -176,7 +176,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index 1ef37c28abc..12ab7636830 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -137,7 +137,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml 
b/flink-connectors/flink-connector-kafka-base/pom.xml
index 2c9113a6afd..78ebfa6116c 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -153,7 +153,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index 617f1232e35..b90cea2093a 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -205,7 +205,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-jdbc/pom.xml 
b/flink-connectors/flink-jdbc/pom.xml
index 60cdb67d40a..165154e3819 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -30,7 +30,7 @@ under the License.
..

 
-   flink-jdbc
+   flink-jdbc_${scala.binary.version}
flink-jdbc
 
jar
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index a4c67d7b8c0..99e57be7a65 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -106,7 +106,7 @@ under the License.
 

   

[jira] [Commented] (FLINK-10823) Missing scala suffixes

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681059#comment-16681059
 ] 

ASF GitHub Bot commented on FLINK-10823:


zentol closed pull request #7055: [FLINK-10823] Add missing scala suffixes
URL: https://github.com/apache/flink/pull/7055
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 04efdef26fc..e482df2e795 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -674,7 +674,7 @@ metrics.reporter.grph.protocol: TCP
 
 ### Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
 
-In order to use this reporter you must copy 
`/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder
+In order to use this reporter you must copy 
`/opt/flink-metrics-prometheus{{site.scala_version_suffix}}-{{site.version}}.jar`
 into the `/lib` folder
 of your Flink distribution.
 
 Parameters:
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index e6d80eabb64..36475dfcb51 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -187,7 +187,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml 
b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 70ec8266417..0203c0ec625 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -196,7 +196,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index f386cba1a08..b3a65931869 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -176,7 +176,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index 1ef37c28abc..12ab7636830 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -137,7 +137,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml 
b/flink-connectors/flink-connector-kafka-base/pom.xml
index 2c9113a6afd..78ebfa6116c 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -153,7 +153,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index 617f1232e35..b90cea2093a 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -205,7 +205,7 @@ under the License.
 

org.apache.flink
-   flink-metrics-jmx
+   
flink-metrics-jmx_${scala.binary.version}
${project.version}
test

diff --git a/flink-connectors/flink-jdbc/pom.xml 
b/flink-connectors/flink-jdbc/pom.xml
index 60cdb67d40a..165154e3819 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -30,7 +30,7 @@ under the License.
..

 
-   flink-jdbc
+   flink-jdbc_${s

[jira] [Closed] (FLINK-10823) Missing scala suffixes

2018-11-09 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10823.

Resolution: Fixed

master:
* 146467fe789ab17180392f0400a0024f1cf9bafd (jdbc)
* 73fc3031ed4563d6e4cc00128804219501e8a837 (prometheus)
* 0a2329f0d128539eb968f81630822007f48e0c93 (jmx)
1.7:
* 365eece2b70b99b0a68edb78b6c9abc2ea84cf11 (jdbc)
* 813212926f266a5095e51225cef35347daa85b65 (prometheus)
* b95d69ae4199dcbb0886a96905bdfec38b9988a5 (jmx)

> Missing scala suffixes
> --
>
> Key: FLINK-10823
> URL: https://issues.apache.org/jira/browse/FLINK-10823
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Metrics
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The jdbc connector and jmx/prometheus reporter have provided dependencies to 
> scala-infected modules (streaming-java/runtime) and thus also require a scala 
> sufix.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol closed pull request #7030: [FLINK-10733] Misleading clean_log_files() in common.sh

2018-11-09 Thread GitBox
zentol closed pull request #7030: [FLINK-10733] Misleading clean_log_files() in 
common.sh
URL: https://github.com/apache/flink/pull/7030
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 4f628fc6d12..9e0ed18dd6f 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -631,11 +631,6 @@ function clean_stdout_files {
 echo "Deleted all stdout files under ${FLINK_DIR}/log/"
 }
 
-function clean_log_files {
-rm ${FLINK_DIR}/log/*
-echo "Deleted all files under ${FLINK_DIR}/log/"
-}
-
 # Expect a string to appear in the log files of the task manager before a 
given timeout
 # $1: expected string
 # $2: timeout in seconds
diff --git a/flink-end-to-end-tests/test-scripts/test-runner-common.sh 
b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
index 0e56d2ac250..1db717251b6 100644
--- a/flink-end-to-end-tests/test-scripts/test-runner-common.sh
+++ b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
@@ -83,7 +83,8 @@ function cleanup_proc {
 
 # Cleans up all temporary folders and files
 function cleanup_tmp_files {
-clean_log_files
+rm ${FLINK_DIR}/log/*
+echo "Deleted all files under ${FLINK_DIR}/log/"
 
 rm -rf ${TEST_DATA_DIR} 2> /dev/null
 echo "Deleted ${TEST_DATA_DIR}"


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10733) Misleading clean_log_files() in common.sh

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681064#comment-16681064
 ] 

ASF GitHub Bot commented on FLINK-10733:


zentol closed pull request #7030: [FLINK-10733] Misleading clean_log_files() in 
common.sh
URL: https://github.com/apache/flink/pull/7030
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 4f628fc6d12..9e0ed18dd6f 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -631,11 +631,6 @@ function clean_stdout_files {
 echo "Deleted all stdout files under ${FLINK_DIR}/log/"
 }
 
-function clean_log_files {
-rm ${FLINK_DIR}/log/*
-echo "Deleted all files under ${FLINK_DIR}/log/"
-}
-
 # Expect a string to appear in the log files of the task manager before a 
given timeout
 # $1: expected string
 # $2: timeout in seconds
diff --git a/flink-end-to-end-tests/test-scripts/test-runner-common.sh 
b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
index 0e56d2ac250..1db717251b6 100644
--- a/flink-end-to-end-tests/test-scripts/test-runner-common.sh
+++ b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
@@ -83,7 +83,8 @@ function cleanup_proc {
 
 # Cleans up all temporary folders and files
 function cleanup_tmp_files {
-clean_log_files
+rm ${FLINK_DIR}/log/*
+echo "Deleted all files under ${FLINK_DIR}/log/"
 
 rm -rf ${TEST_DATA_DIR} 2> /dev/null
 echo "Deleted ${TEST_DATA_DIR}"


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Misleading clean_log_files() in common.sh
> -
>
> Key: FLINK-10733
> URL: https://issues.apache.org/jira/browse/FLINK-10733
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.6.2
>Reporter: Kostas Kloudas
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> In the `common.sh` base script of the end-to-end tests, there is a 
> `clean_stdout_files` which cleans only the `*.out` files and a 
> `clean_log_files` which cleans *both* `*.log` and `*.out` files.
> Given the current behavior that at the end of a test, the logs are checked 
> and if there are exceptions (even expected ones but not whitelisted), the 
> tests fails, some tests chose to call the `clean_log_files` so that 
> exceptions are ignored. In this case, also `*.out` files are cleaned so if a 
> test was checking for errors in the `.out` files, then the test will falsely 
> pass.
> The solution is as simple as renaming the method to something more 
> descriptive like `clean_logs_and_output_files`, but doing so, also includes 
> checking if any existing tests were falsely passing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10733) Misleading clean_log_files() in common.sh

2018-11-09 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10733.

   Resolution: Fixed
Fix Version/s: 1.7.0

master: 89e59d8f398abf3597eb268c7e34927e35d8f383
1.7: 6d138a84e3feab488d679716e1475f95508d2662

> Misleading clean_log_files() in common.sh
> -
>
> Key: FLINK-10733
> URL: https://issues.apache.org/jira/browse/FLINK-10733
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.6.2
>Reporter: Kostas Kloudas
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In the `common.sh` base script of the end-to-end tests, there is a 
> `clean_stdout_files` which cleans only the `*.out` files and a 
> `clean_log_files` which cleans *both* `*.log` and `*.out` files.
> Given the current behavior that at the end of a test, the logs are checked 
> and if there are exceptions (even expected ones but not whitelisted), the 
> tests fails, some tests chose to call the `clean_log_files` so that 
> exceptions are ignored. In this case, also `*.out` files are cleaned so if a 
> test was checking for errors in the `.out` files, then the test will falsely 
> pass.
> The solution is as simple as renaming the method to something more 
> descriptive like `clean_logs_and_output_files`, but doing so, also includes 
> checking if any existing tests were falsely passing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges

2018-11-09 Thread GitBox
yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-437288416
 
 
   @zentol Can you give a final check about this PR? thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681070#comment-16681070
 ] 

ASF GitHub Bot commented on FLINK-10252:


yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-437288416
 
 
   @zentol Can you give a final check about this PR? thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10813) Automatically seach for missing scala suffixes

2018-11-09 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10813.

   Resolution: Fixed
Fix Version/s: (was: 1.8.0)
   1.7.0

master: 3e52fd0d6f16c1f5883925ad6bdf89f32ec4f78d
1.7: 789d8a79612d13d579674a18b552abab414f23a9

> Automatically seach for missing scala suffixes
> --
>
> Key: FLINK-10813
> URL: https://issues.apache.org/jira/browse/FLINK-10813
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.7.0
>
>
> To ensure issues like FLINK-10811 cannot pop up again we should look into 
> automating the scala-suffix requirements check.
> We already have a script to do this check (tools/verify_scala_suffixes.sh), 
> supposedly we just have to add it to travis.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10645) Update StatefulJobSavepointMigrationITCase for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-10645.
---
Resolution: Duplicate

> Update StatefulJobSavepointMigrationITCase for 1.7
> --
>
> Key: FLINK-10645
> URL: https://issues.apache.org/jira/browse/FLINK-10645
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10646) Update AbstractOperatorRestoreTestBase for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-10646.
---
Resolution: Duplicate

> Update AbstractOperatorRestoreTestBase for 1.7
> --
>
> Key: FLINK-10646
> URL: https://issues.apache.org/jira/browse/FLINK-10646
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10650) Update BucketingSinkMigrationTest for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-10650.
---
Resolution: Duplicate

> Update BucketingSinkMigrationTest for 1.7
> -
>
> Key: FLINK-10650
> URL: https://issues.apache.org/jira/browse/FLINK-10650
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10647) Update WindowOperatorMigrationTest for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-10647.
---
Resolution: Duplicate

> Update WindowOperatorMigrationTest for 1.7
> --
>
> Key: FLINK-10647
> URL: https://issues.apache.org/jira/browse/FLINK-10647
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10828) Enforce that all TypeSerializers are tested through SerializerTestBase

2018-11-09 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681072#comment-16681072
 ] 

Stefan Richter commented on FLINK-10828:


When I was talking to [~trohrm...@apache.org], he said that we have already a 
similar test, checking that all implementations of a certain interface are 
tested, but I don't remember which test that was. So my assumption is we can do 
something similar here. For the second part, I think there is no alternative to 
manually checking this. But it also only applies to some serializers. We could 
simplify this by creating a wrapper that can make any serializer artifiacially 
stateful, e.g. by buffering the reads and writes etc., so that we can still use 
simple stuff like {{IntSerializer}} and just wrap if we use it as an inner 
serializer.

> Enforce that all TypeSerializers are tested through SerializerTestBase
> --
>
> Key: FLINK-10828
> URL: https://issues.apache.org/jira/browse/FLINK-10828
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Stefan Richter
>Priority: Major
>
> As pointed out in FLINK-10827, type serializers are a common source of bugs 
> and we should try to enforce that every type serializer (that is not 
> exclusive to tests) is tested at least through a test that extends the 
> {{SerializerTestBase}}. We should also check that the tests of composite 
> serializers use inner serializers that are stateful and require proper 
> duplication, or else the tests can pass even if inner serializers are not 
> properly duplicated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10649) Update FlinkKafkaConsumerBaseMigrationTest for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-10649.
---
Resolution: Duplicate

> Update FlinkKafkaConsumerBaseMigrationTest for 1.7
> --
>
> Key: FLINK-10649
> URL: https://issues.apache.org/jira/browse/FLINK-10649
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10778) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10778:

Affects Version/s: 1.8.0

> Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.7
> -
>
> Key: FLINK-10778
> URL: https://issues.apache.org/jira/browse/FLINK-10778
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover 
> restoring from Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10777) Update TypeSerializerSnapshotMigrationITCase for Flink 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10777:

Fix Version/s: 1.8.0

> Update TypeSerializerSnapshotMigrationITCase for Flink 1.7
> --
>
> Key: FLINK-10777
> URL: https://issues.apache.org/jira/browse/FLINK-10777
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.8.0
>
>
> Update {{TypeSerializerSnapshotMigrationITCase}} to cover restoring from 
> Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10651) Update CEPMigrationTest for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-10651.
---
Resolution: Duplicate

> Update CEPMigrationTest for 1.7
> ---
>
> Key: FLINK-10651
> URL: https://issues.apache.org/jira/browse/FLINK-10651
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10652) Update StatefulJobWBroadcastStateMigrationITCase for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-10652.
---
Resolution: Duplicate

> Update StatefulJobWBroadcastStateMigrationITCase for 1.7
> 
>
> Key: FLINK-10652
> URL: https://issues.apache.org/jira/browse/FLINK-10652
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10777) Update TypeSerializerSnapshotMigrationITCase for Flink 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10777:

Affects Version/s: 1.8.0

> Update TypeSerializerSnapshotMigrationITCase for Flink 1.7
> --
>
> Key: FLINK-10777
> URL: https://issues.apache.org/jira/browse/FLINK-10777
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.8.0
>
>
> Update {{TypeSerializerSnapshotMigrationITCase}} to cover restoring from 
> Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10779) Update Java / Scala StatefulJobSavepointMigrationITCase for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10779:

Fix Version/s: 1.8.0

> Update Java / Scala StatefulJobSavepointMigrationITCase for 1.7
> ---
>
> Key: FLINK-10779
> URL: https://issues.apache.org/jira/browse/FLINK-10779
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.8.0
>
>
> Update Java / Scala {{StatefulJobSavepointMigrationITCase}} so that it covers 
> recovering from Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10778) Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10778:

Fix Version/s: 1.8.0

> Update TypeSerializerSnapshotMigrationTestBase and subclasses for 1.7
> -
>
> Key: FLINK-10778
> URL: https://issues.apache.org/jira/browse/FLINK-10778
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.8.0
>
>
> Update {{TypeSerializerSnapshotMigrationTestBase}} and subclasses to cover 
> restoring from Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10779) Update Java / Scala StatefulJobSavepointMigrationITCase for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10779:

Affects Version/s: 1.8.0

> Update Java / Scala StatefulJobSavepointMigrationITCase for 1.7
> ---
>
> Key: FLINK-10779
> URL: https://issues.apache.org/jira/browse/FLINK-10779
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.8.0
>
>
> Update Java / Scala {{StatefulJobSavepointMigrationITCase}} so that it covers 
> recovering from Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10780) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10780:

Fix Version/s: 1.8.0

> Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7
> -
>
> Key: FLINK-10780
> URL: https://issues.apache.org/jira/browse/FLINK-10780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.8.0
>
>
> Update Java / Scala {{StatefulJobWBroadcastStateMigrationITCase}} so that it 
> covers restoring from Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10630) Update migration tests for Flink 1.7

2018-11-09 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681074#comment-16681074
 ] 

Stefan Richter commented on FLINK-10630:


There is no work for 1.8 from my side, because this was originally targetet for 
1.7, not 1.8. And that was a planning mistake because it was already done.

> Update migration tests for Flink 1.7
> 
>
> Key: FLINK-10630
> URL: https://issues.apache.org/jira/browse/FLINK-10630
> Project: Flink
>  Issue Type: Task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.8.0
>
>
> Similar to FLINK-10084, we need to update the migration tests for the latest 
> Flink version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10781) Update BucketingSinkMigrationTest for Flink 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10781:

Fix Version/s: 1.8.0

> Update BucketingSinkMigrationTest for Flink 1.7
> ---
>
> Key: FLINK-10781
> URL: https://issues.apache.org/jira/browse/FLINK-10781
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.8.0
>
>
> Update {{BucketingSinkMigrationTest}} so that it covers restoring from 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10781) Update BucketingSinkMigrationTest for Flink 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10781:

Affects Version/s: 1.8.0

> Update BucketingSinkMigrationTest for Flink 1.7
> ---
>
> Key: FLINK-10781
> URL: https://issues.apache.org/jira/browse/FLINK-10781
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.8.0
>
>
> Update {{BucketingSinkMigrationTest}} so that it covers restoring from 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9702) Improvement in (de)serialization of keys and values for RocksDB state

2018-11-09 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681078#comment-16681078
 ] 

Stefan Richter commented on FLINK-9702:
---

I think it still makes sense, please consider the link in my previous comment. 
I had already a almost complete implementation that just needs some rebasing I 
guess, no need to start from scratch.

> Improvement in (de)serialization of keys and values for RocksDB state
> -
>
> Key: FLINK-9702
> URL: https://issues.apache.org/jira/browse/FLINK-9702
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Stefan Richter
>Assignee: Congxian Qiu
>Priority: Major
>
> When Flink interacts with state in RocksDB, object (de)serialization often 
> contributes significantly to performance overhead. I think there are some 
> aspects that we can improve here to reduce the costs in this area. In 
> particular, currently every state has to serialize the backen's current key 
> before each state access. We could reduce this effort by sharing serialized 
> key bytes across all state interactions. Furthermore, we can reduce the 
> amount of  `byte[]` and stream/view that are involved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #7066: [FLINK-10826][test] Decrease deployment size of heavy deplyment e2e t…

2018-11-09 Thread GitBox
zentol commented on issue #7066: [FLINK-10826][test] Decrease deployment size 
of heavy deplyment e2e t…
URL: https://github.com/apache/flink/pull/7066#issuecomment-437290799
 
 
   You may want to increase the network timeout like we did for the 
high-parallelism iterations test. 
https://github.com/apache/flink/commit/ecd4567bcf5403f26c46d212d6e1698b3eeadbfb
   
   Jobs that take too long time (25+ seconds) to fully deploy may fail 
otherwise.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10826) Heavy deployment end-to-end test produces no output on Travis

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681080#comment-16681080
 ] 

ASF GitHub Bot commented on FLINK-10826:


zentol commented on issue #7066: [FLINK-10826][test] Decrease deployment size 
of heavy deplyment e2e t…
URL: https://github.com/apache/flink/pull/7066#issuecomment-437290799
 
 
   You may want to increase the network timeout like we did for the 
high-parallelism iterations test. 
https://github.com/apache/flink/commit/ecd4567bcf5403f26c46d212d6e1698b3eeadbfb
   
   Jobs that take too long time (25+ seconds) to fully deploy may fail 
otherwise.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Heavy deployment end-to-end test produces no output on Travis
> -
>
> Key: FLINK-10826
> URL: https://issues.apache.org/jira/browse/FLINK-10826
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Timo Walther
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Attachments: heavy_deployement_log.txt
>
>
> The Heavy deployment end-to-end test produces no output on Travis such that 
> it is killed after 10 minutes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #7045: [hotfix] Update nightly master cron jobs

2018-11-09 Thread GitBox
zentol commented on a change in pull request #7045: [hotfix] Update nightly 
master cron jobs
URL: https://github.com/apache/flink/pull/7045#discussion_r231874926
 
 

 ##
 File path: nightly.sh
 ##
 @@ -43,7 +43,7 @@ mkdir -p $ARTIFACTS_DIR || { echo "FAILURE: cannot create 
log directory '${ARTIF
 LOG4J_PROPERTIES=${FLINK_DIR}/tools/log4j-travis.properties
 
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} 
-Dlog4j.configuration=file://$LOG4J_PROPERTIES 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN_COMMON_OPTIONS="-nsu -B -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 
-Dfast"
+MVN_COMMON_OPTIONS="-T1C -nsu -B -Dflink.forkCount=2 
-Dflink.forkCountTestPackage=2 -Dfast"
 
 Review comment:
   otherwise we're executing (java) e2e tests in parallel


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10648) Update ContinuousFileProcessingMigrationTest for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-10648.
---
Resolution: Duplicate

> Update ContinuousFileProcessingMigrationTest for 1.7
> 
>
> Key: FLINK-10648
> URL: https://issues.apache.org/jira/browse/FLINK-10648
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10780) Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7

2018-11-09 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10780:

Affects Version/s: 1.8.0

> Update Java / Scala StatefulJobWBroadcastStateMigrationITCase for 1.7
> -
>
> Key: FLINK-10780
> URL: https://issues.apache.org/jira/browse/FLINK-10780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.8.0
>
>
> Update Java / Scala {{StatefulJobWBroadcastStateMigrationITCase}} so that it 
> covers restoring from Flink 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski closed pull request #7060: [FLINK-10801][e2e] Retry verify_result_hash in elastichsearch-common

2018-11-09 Thread GitBox
pnowojski closed pull request #7060: [FLINK-10801][e2e] Retry 
verify_result_hash in elastichsearch-common
URL: https://github.com/apache/flink/pull/7060
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 36cc0df142c..6b8bd061f15 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -451,6 +451,16 @@ function cancel_job {
 }
 
 function check_result_hash {
+  local error_code=0
+  check_result_hash_no_exit "$@" || error_code=$?
+
+  if [ "$error_code" != "0" ]
+  then
+exit $error_code
+  fi
+}
+
+function check_result_hash_no_exit {
   local name=$1
   local outfile_prefix=$2
   local expected=$3
@@ -462,18 +472,19 @@ function check_result_hash {
 actual=$(LC_ALL=C sort $outfile_prefix* | md5sum | awk '{print $1}')
   else
 echo "Neither 'md5' nor 'md5sum' binary available."
-exit 2
+return 2
   fi
   if [[ "$actual" != "$expected" ]]
   then
 echo "FAIL $name: Output hash mismatch.  Got $actual, expected $expected."
 echo "head hexdump of actual:"
 head $outfile_prefix* | hexdump -c
-exit 1
+return 1
   else
 echo "pass $name"
 # Output files are left behind in /tmp
   fi
+  return 0
 }
 
 # This function starts the given number of task managers and monitors their 
processes.
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh 
b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 8bb9c42ab69..695905222f5 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -83,23 +83,34 @@ function verify_result_hash {
   local name=$1
   local index=$2
   local numRecords=$3
-  local hash=$4
+  local expectedHash=$4
 
-  while : ; do
+  for i in {1..30}
+  do
+local error_code=0
+
+echo "Result verification attempt $i..."
 curl "localhost:9200/${index}/_search?q=*&pretty" > 
$TEST_DATA_DIR/es_output || true
 
-if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/es_output)" ]; 
then
-  break
-else
-  echo "Waiting for Elasticsearch records ..."
+# remove meta information
+sed '2,9d' $TEST_DATA_DIR/es_output > $TEST_DATA_DIR/es_content
+
+check_result_hash_no_exit "$name" $TEST_DATA_DIR/es_content 
"$expectedHash" || error_code=$?
+
+if [ "$error_code" != "0" ]
+then
+  echo "Result verification attempt $i has failed"
   sleep 1
+else
+  break
 fi
   done
 
-  # remove meta information
-  sed '2,9d' $TEST_DATA_DIR/es_output > $TEST_DATA_DIR/es_content
-
-  check_result_hash "$name" $TEST_DATA_DIR/es_content "$hash"
+  if [ "$error_code" != "0" ]
+  then
+echo "All result verification attempts have failed"
+exit $error_code
+  fi
 }
 
 function shutdown_elasticsearch_cluster {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10801) Fix sql client integrate elasticsearch connector test failure

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681084#comment-16681084
 ] 

ASF GitHub Bot commented on FLINK-10801:


pnowojski closed pull request #7060: [FLINK-10801][e2e] Retry 
verify_result_hash in elastichsearch-common
URL: https://github.com/apache/flink/pull/7060
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 36cc0df142c..6b8bd061f15 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -451,6 +451,16 @@ function cancel_job {
 }
 
 function check_result_hash {
+  local error_code=0
+  check_result_hash_no_exit "$@" || error_code=$?
+
+  if [ "$error_code" != "0" ]
+  then
+exit $error_code
+  fi
+}
+
+function check_result_hash_no_exit {
   local name=$1
   local outfile_prefix=$2
   local expected=$3
@@ -462,18 +472,19 @@ function check_result_hash {
 actual=$(LC_ALL=C sort $outfile_prefix* | md5sum | awk '{print $1}')
   else
 echo "Neither 'md5' nor 'md5sum' binary available."
-exit 2
+return 2
   fi
   if [[ "$actual" != "$expected" ]]
   then
 echo "FAIL $name: Output hash mismatch.  Got $actual, expected $expected."
 echo "head hexdump of actual:"
 head $outfile_prefix* | hexdump -c
-exit 1
+return 1
   else
 echo "pass $name"
 # Output files are left behind in /tmp
   fi
+  return 0
 }
 
 # This function starts the given number of task managers and monitors their 
processes.
diff --git a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh 
b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
index 8bb9c42ab69..695905222f5 100644
--- a/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
+++ b/flink-end-to-end-tests/test-scripts/elasticsearch-common.sh
@@ -83,23 +83,34 @@ function verify_result_hash {
   local name=$1
   local index=$2
   local numRecords=$3
-  local hash=$4
+  local expectedHash=$4
 
-  while : ; do
+  for i in {1..30}
+  do
+local error_code=0
+
+echo "Result verification attempt $i..."
 curl "localhost:9200/${index}/_search?q=*&pretty" > 
$TEST_DATA_DIR/es_output || true
 
-if [ -n "$(grep "\"total\" : $numRecords" $TEST_DATA_DIR/es_output)" ]; 
then
-  break
-else
-  echo "Waiting for Elasticsearch records ..."
+# remove meta information
+sed '2,9d' $TEST_DATA_DIR/es_output > $TEST_DATA_DIR/es_content
+
+check_result_hash_no_exit "$name" $TEST_DATA_DIR/es_content 
"$expectedHash" || error_code=$?
+
+if [ "$error_code" != "0" ]
+then
+  echo "Result verification attempt $i has failed"
   sleep 1
+else
+  break
 fi
   done
 
-  # remove meta information
-  sed '2,9d' $TEST_DATA_DIR/es_output > $TEST_DATA_DIR/es_content
-
-  check_result_hash "$name" $TEST_DATA_DIR/es_content "$hash"
+  if [ "$error_code" != "0" ]
+  then
+echo "All result verification attempts have failed"
+exit $error_code
+  fi
 }
 
 function shutdown_elasticsearch_cluster {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix sql client integrate elasticsearch connector test failure
> -
>
> Key: FLINK-10801
> URL: https://issues.apache.org/jira/browse/FLINK-10801
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Reporter: vinoyang
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> It usually reports : 
> {code:java}
> FAIL SQL Client Elasticsearch Upsert: Output hash mismatch. Got 
> 6187222e109ee9222e6b2f117742070c, expected 982cb32908def9801e781381c1b8a8db.
> head hexdump of actual:
> 000 { \n " h i t s " : { \n 
> 010 " t o t a l " : 3 , \n
> 020 " m a x _ s c o r e " 
> 030 : 1 . 0 , \n " h i t s
> 040 " : [ \n { \n 
> 050 " _ i n d e x " :
> 060 " m y _ u s e r s " , \n 
> 070 " _ t y p e " : "
> 080 u s e r " , \n "
> 090 _ i d " : " 1 _ B o b "
> 0a0 , \n " _ s c o r
> 0b0 e " : 1 . 0 , \n 
> 0ba
> {code}
> the actual hash means : 
> {code:java}
> {
>   "hits" : {
> "total" : 3,
> "max_score" : 1.0,
> "hits" : [
>   {
> "_index" : "my_users",
> "_type" : "use

[GitHub] zentol commented on issue #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

2018-11-09 Thread GitBox
zentol commented on issue #7061: [FLINK-10827][tests] Add test for duplicate() 
to SerializerTestBase
URL: https://github.com/apache/flink/pull/7061#issuecomment-437292479
 
 
   Related test-failure:
   ```
   
testDuplicate(org.apache.flink.api.java.typeutils.runtime.PojoSubclassSerializerTest)
  Time elapsed: 0.101 sec  <<< ERROR!
   java.lang.IllegalStateException: Concurrent access to KryoSerializer. Thread 
1: Thread-240 , Thread 2: Thread-241
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.enterExclusiveThread(KryoSerializer.java:623)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:211)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:256)
at 
org.apache.flink.api.common.typeutils.SerializerTestBase$SerializerRunner.run(SerializerTestBase.java:597)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10827) Add test for duplicate() to SerializerTestBase

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681085#comment-16681085
 ] 

ASF GitHub Bot commented on FLINK-10827:


zentol commented on issue #7061: [FLINK-10827][tests] Add test for duplicate() 
to SerializerTestBase
URL: https://github.com/apache/flink/pull/7061#issuecomment-437292479
 
 
   Related test-failure:
   ```
   
testDuplicate(org.apache.flink.api.java.typeutils.runtime.PojoSubclassSerializerTest)
  Time elapsed: 0.101 sec  <<< ERROR!
   java.lang.IllegalStateException: Concurrent access to KryoSerializer. Thread 
1: Thread-240 , Thread 2: Thread-241
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.enterExclusiveThread(KryoSerializer.java:623)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:211)
at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:256)
at 
org.apache.flink.api.common.typeutils.SerializerTestBase$SerializerRunner.run(SerializerTestBase.java:597)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add test for duplicate() to SerializerTestBase
> --
>
> Key: FLINK-10827
> URL: https://issues.apache.org/jira/browse/FLINK-10827
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In the past, we had many bugs from type serializers that have not properly 
> implemented the {{duplicate()}} method in a proper way. A very common error 
> is to forget about creating a deep copy of some fields that can lead to 
> concurrency problems in the backend.
> We should add a test case for that tests duplicated serializer from different 
> threads to expose concurrency problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10801) Fix sql client integrate elasticsearch connector test failure

2018-11-09 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-10801.
--
   Resolution: Fixed
Fix Version/s: 1.7.0

merged commit eaa2959 into apache:master
merged commit 158e2c89e1 into apache:release-1.7

> Fix sql client integrate elasticsearch connector test failure
> -
>
> Key: FLINK-10801
> URL: https://issues.apache.org/jira/browse/FLINK-10801
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Reporter: vinoyang
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> It usually reports : 
> {code:java}
> FAIL SQL Client Elasticsearch Upsert: Output hash mismatch. Got 
> 6187222e109ee9222e6b2f117742070c, expected 982cb32908def9801e781381c1b8a8db.
> head hexdump of actual:
> 000 { \n " h i t s " : { \n 
> 010 " t o t a l " : 3 , \n
> 020 " m a x _ s c o r e " 
> 030 : 1 . 0 , \n " h i t s
> 040 " : [ \n { \n 
> 050 " _ i n d e x " :
> 060 " m y _ u s e r s " , \n 
> 070 " _ t y p e " : "
> 080 u s e r " , \n "
> 090 _ i d " : " 1 _ B o b "
> 0a0 , \n " _ s c o r
> 0b0 e " : 1 . 0 , \n 
> 0ba
> {code}
> the actual hash means : 
> {code:java}
> {
>   "hits" : {
> "total" : 3,
> "max_score" : 1.0,
> "hits" : [
>   {
> "_index" : "my_users",
> "_type" : "user",
> "_id" : "1_Bob  ",
> "_score" : 1.0,
> "_source" : {
>   "user_id" : 1,
>   "user_name" : "Bob  ",
>   "user_count" : 1
> }
>   },
>   {
> "_index" : "my_users",
> "_type" : "user",
> "_id" : "22_Alice",
> "_score" : 1.0,
> "_source" : {
>   "user_id" : 22,
>   "user_name" : "Alice",
>   "user_count" : 1
> }
>   },
>   {
> "_index" : "my_users",
> "_type" : "user",
> "_id" : "42_Greg ",
> "_score" : 1.0,
> "_source" : {
>   "user_id" : 42,
>   "user_name" : "Greg ",
>   "user_count" : 3
> }
>   }
> ]
>   }
> }
> {code}
> the expected hash code means : 
> {code:java}
> {
>   "hits" : {
> "total" : 3,
> "max_score" : 1.0,
> "hits" : [
>   {
> "_index" : "my_users",
> "_type" : "user",
> "_id" : "1_Bob  ",
> "_score" : 1.0,
> "_source" : {
>   "user_id" : 1,
>   "user_name" : "Bob  ",
>   "user_count" : 2
> }
>   },
>   {
> "_index" : "my_users",
> "_type" : "user",
> "_id" : "22_Alice",
> "_score" : 1.0,
> "_source" : {
>   "user_id" : 22,
>   "user_name" : "Alice",
>   "user_count" : 1
> }
>   },
>   {
> "_index" : "my_users",
> "_type" : "user",
> "_id" : "42_Greg ",
> "_score" : 1.0,
> "_source" : {
>   "user_id" : 42,
>   "user_name" : "Greg ",
>   "user_count" : 3
> }
>   }
> ]
>   }
> }
> {code}
> It seems that the user count for "Bob" is off by 1.
> The speculation is due to the premature acquisition of aggregated statistics 
> from Elasticsearch.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10837) Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?

2018-11-09 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-10837:
--

 Summary: Kafka 2.0 test KafkaITCase.testOneToOneSources dead 
locked on deleteTestTopic?
 Key: FLINK-10837
 URL: https://issues.apache.org/jira/browse/FLINK-10837
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.0
Reporter: Piotr Nowojski


https://api.travis-ci.org/v3/job/452439034/log.txt

{noformat}
Tests in error: 
  
KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:875->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
 » TestTimedOut
{noformat}

{noformat}
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:158)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:206)
at 
org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneToOneExactlyOnceTest(KafkaConsumerTestBase.java:875)
at 
org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneToOneSources(KafkaITCase.java:97)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on issue #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase

2018-11-09 Thread GitBox
StefanRRichter commented on issue #7061: [FLINK-10827][tests] Add test for 
duplicate() to SerializerTestBase
URL: https://github.com/apache/flink/pull/7061#issuecomment-437293862
 
 
   Thanks, the failure is great because we probably caught another offender.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10827) Add test for duplicate() to SerializerTestBase

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681090#comment-16681090
 ] 

ASF GitHub Bot commented on FLINK-10827:


StefanRRichter commented on issue #7061: [FLINK-10827][tests] Add test for 
duplicate() to SerializerTestBase
URL: https://github.com/apache/flink/pull/7061#issuecomment-437293862
 
 
   Thanks, the failure is great because we probably caught another offender.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add test for duplicate() to SerializerTestBase
> --
>
> Key: FLINK-10827
> URL: https://issues.apache.org/jira/browse/FLINK-10827
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In the past, we had many bugs from type serializers that have not properly 
> implemented the {{duplicate()}} method in a proper way. A very common error 
> is to forget about creating a deep copy of some fields that can lead to 
> concurrency problems in the backend.
> We should add a test case for that tests duplicated serializer from different 
> threads to expose concurrency problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
yanghua commented on issue #6927: [FLINK-10624] Extend SQL client end-to-end to 
test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#issuecomment-437294266
 
 
   @pnowojski Currently Travis is passed, I see that you have merged the fix 
for elasticsearch, I will merge your fix and verify it again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681093#comment-16681093
 ] 

ASF GitHub Bot commented on FLINK-10624:


yanghua commented on issue #6927: [FLINK-10624] Extend SQL client end-to-end to 
test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#issuecomment-437294266
 
 
   @pnowojski Currently Travis is passed, I see that you have merged the fix 
for elasticsearch, I will merge your fix and verify it again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua edited a comment on issue #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
yanghua edited a comment on issue #6927: [FLINK-10624] Extend SQL client 
end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#issuecomment-437294266
 
 
   @pnowojski Currently Travis is passed, I see that you have merged the fix 
for elasticsearch, I will merge your fix and verify it again.
   
   You can also check if there are other places that need to be modified, I 
will fix it together.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681100#comment-16681100
 ] 

ASF GitHub Bot commented on FLINK-10624:


yanghua edited a comment on issue #6927: [FLINK-10624] Extend SQL client 
end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#issuecomment-437294266
 
 
   @pnowojski Currently Travis is passed, I see that you have merged the fix 
for elasticsearch, I will merge your fix and verify it again.
   
   You can also check if there are other places that need to be modified, I 
will fix it together.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Clarkkkkk commented on issue #7051: [FLINK-10820][network] Simplify the RebalancePartitioner implementation

2018-11-09 Thread GitBox
Clark commented on issue #7051: [FLINK-10820][network] Simplify the 
RebalancePartitioner implementation
URL: https://github.com/apache/flink/pull/7051#issuecomment-437295933
 
 
   Codes LGTM. Since RebalancePartitioner and RoundRobinPartitioner are 
duplicate right now, I would vote for keeping RoundRobinPartitioner whose name 
is more intuitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10820) Simplify the RebalancePartitioner implementation

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681106#comment-16681106
 ] 

ASF GitHub Bot commented on FLINK-10820:


Clark commented on issue #7051: [FLINK-10820][network] Simplify the 
RebalancePartitioner implementation
URL: https://github.com/apache/flink/pull/7051#issuecomment-437295933
 
 
   Codes LGTM. Since RebalancePartitioner and RoundRobinPartitioner are 
duplicate right now, I would vote for keeping RoundRobinPartitioner whose name 
is more intuitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Simplify the RebalancePartitioner implementation
> 
>
> Key: FLINK-10820
> URL: https://issues.apache.org/jira/browse/FLINK-10820
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.8.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> The current {{RebalancePartitioner}} implementations seems a little hacky for 
> selecting a random number as the first channel index, and the following 
> selections based on this random index in round-robin fashion.
> We can define a constant as the first channel index to make the 
> implementation simple and readable. To do so, it will not change the 
> rebalance semantics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10834) TableAPI flatten() calculated value error

2018-11-09 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681180#comment-16681180
 ] 

Fabian Hueske commented on FLINK-10834:
---

Do you know what the SQL standard defines for non-deterministic functions (or 
how other DBMS handle the situation)?
Should they be evaluated just once per row?

If not, than this behavior might be actually fine.

This might also be a Calcite issue.

> TableAPI flatten() calculated value error
> -
>
> Key: FLINK-10834
> URL: https://issues.apache.org/jira/browse/FLINK-10834
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.7.1
>
>
> We have a UDF as follows:
> {code:java}
> object FuncRow extends ScalarFunction {
> def eval(v: Int): Row = { 
>   val version = "" + new Random().nextInt()          
>   val row = new Row(3)          
>   row.setField(0, version)          
>   row.setField(1, version)          
>   row.setField(2, version)          
>   row 
> }
> override def isDeterministic: Boolean = false
> override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
>  Types.ROW(Types.STRING, Types.STRING, Types.STRING)
> }
> {code}
> Do the following Query:
> {code:sql}
> val data = new mutable.MutableList[(Int, Long, String)]
>  data.+=((1, 1L, "Hi"))
>  val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b,'c)
>  .select(FuncRow('a).flatten()).as('v1, 'v2, 'v3)
> {code}
> The result is : -1189206469,-151367792,1988676906
> The result expected by the user should be:  v1==v2==v3 .
> It looks the real reason is that there is no result of the reuse in codegen.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10662) Refactor the ChannelSelector interface for single selected channel

2018-11-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681198#comment-16681198
 ] 

zhijiang commented on FLINK-10662:
--

[~pnowojski], I am currently working on this. If we change to return single 
{{int}} for {{ChannelSelector#selectChannels, there are some choices for 
special }}{{BroadcastPartitioner}} implementation.
 # Return any int such as -1 for BroadcastPartitioner, because this value would 
not really be used in {{RecordWriter}} side. We can make it shortcut branch for 
{{BroadcastPartitioner}} in {{RecordWriter}}. This way is easy to handle, but 
may seem a little hacky to return a dummy value in {{selectChannels}} method.
 # Return a `tuple` for {{ChannelSelector#selectChannels}}, and 
the first boolean value indicate whether it is broadcast or not. If broadcast, 
we will ignore the second selected channel index in tuple. I am wondering if it 
would bring additional overhead than the first way above.
 # Define a high level interface called {{ChannelSelectorBase}} with no 
specific methods, then {{ChannelSelector}} extends {{ChannelSelectorBase}} and 
{{BroadcastPartitioner}} implements {{ChannelSelectorBase}} directly, because 
it does not need any methods to return selected channels. In this way, we 
change the current class structure and also need adjust the API in 
{{DataStream}} to reference {{ChannelSelectorBase}} instead of 
{{StreamPartitioner}}. It would be more complex to handle all the related 
references.

Currently, I would prefer the first way although it seems a little hacky, or do 
you have other better suggestions? :)

> Refactor the ChannelSelector interface for single selected channel
> --
>
> Key: FLINK-10662
> URL: https://issues.apache.org/jira/browse/FLINK-10662
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In the discussion of broadcast improvement, [~pnowojski] pointed out the 
> issue of improving the current channel selector.
>  
> In {{ChannelSelector#selectChannels}}, it would return an array for selected 
> channels. But considering specific implementations, only 
> {{BroadcastPartitioner}} would select all the channels, and other 
> implementations will select one channel. So we can simple this interface to 
> return single channel index for benefiting performance, and specialize the 
> {{BroadcastPartitioner}} in a more efficient way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10662) Refactor the ChannelSelector interface for single selected channel

2018-11-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681198#comment-16681198
 ] 

zhijiang edited comment on FLINK-10662 at 11/9/18 10:07 AM:


[~pnowojski], I am currently working on this. If we change to return single 
{{int}} for {{ChannelSelector#selectChannels, there are some choices for 
special {{BroadcastPartitioner implementation.
 # Return any int such as -1 for BroadcastPartitioner, because this value would 
not really be used in {{RecordWriter}} side. We can make it shortcut branch for 
{{BroadcastPartitioner}} in {{RecordWriter}}. This way is easy to handle, but 
may seem a little hacky to return a dummy value in {{selectChannels}} method.
 # Return a `tuple` for {{ChannelSelector#selectChannels}}, and 
the first boolean value indicate whether it is broadcast or not. If broadcast, 
we will ignore the second selected channel index in tuple. I am wondering if it 
would bring additional overhead than the first way above.
 # Define a high level interface called {{ChannelSelectorBase}} with no 
specific methods, then {{ChannelSelector}} extends {{ChannelSelectorBase}} and 
{{BroadcastPartitioner}} implements {{ChannelSelectorBase}} directly, because 
it does not need any methods to return selected channels. In this way, we 
change the current class structure and also need adjust the API in 
{{DataStream}} to reference {{ChannelSelectorBase}} instead of 
{{StreamPartitioner}}. It would be more complex to handle all the related 
references.

Currently, I would prefer the first way although it seems a little hacky, or do 
you have other better suggestions? :)


was (Author: zjwang):
[~pnowojski], I am currently working on this. If we change to return single 
{{int}} for {{ChannelSelector#selectChannels, there are some choices for 
special }}{{BroadcastPartitioner}} implementation.
 # Return any int such as -1 for BroadcastPartitioner, because this value would 
not really be used in {{RecordWriter}} side. We can make it shortcut branch for 
{{BroadcastPartitioner}} in {{RecordWriter}}. This way is easy to handle, but 
may seem a little hacky to return a dummy value in {{selectChannels}} method.
 # Return a `tuple` for {{ChannelSelector#selectChannels}}, and 
the first boolean value indicate whether it is broadcast or not. If broadcast, 
we will ignore the second selected channel index in tuple. I am wondering if it 
would bring additional overhead than the first way above.
 # Define a high level interface called {{ChannelSelectorBase}} with no 
specific methods, then {{ChannelSelector}} extends {{ChannelSelectorBase}} and 
{{BroadcastPartitioner}} implements {{ChannelSelectorBase}} directly, because 
it does not need any methods to return selected channels. In this way, we 
change the current class structure and also need adjust the API in 
{{DataStream}} to reference {{ChannelSelectorBase}} instead of 
{{StreamPartitioner}}. It would be more complex to handle all the related 
references.

Currently, I would prefer the first way although it seems a little hacky, or do 
you have other better suggestions? :)

> Refactor the ChannelSelector interface for single selected channel
> --
>
> Key: FLINK-10662
> URL: https://issues.apache.org/jira/browse/FLINK-10662
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In the discussion of broadcast improvement, [~pnowojski] pointed out the 
> issue of improving the current channel selector.
>  
> In {{ChannelSelector#selectChannels}}, it would return an array for selected 
> channels. But considering specific implementations, only 
> {{BroadcastPartitioner}} would select all the channels, and other 
> implementations will select one channel. So we can simple this interface to 
> return single channel index for benefiting performance, and specialize the 
> {{BroadcastPartitioner}} in a more efficient way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10662) Refactor the ChannelSelector interface for single selected channel

2018-11-09 Thread zhijiang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681198#comment-16681198
 ] 

zhijiang edited comment on FLINK-10662 at 11/9/18 10:08 AM:


[~pnowojski], I am currently working on this. If we change to return single 
{{int}} for {{ChannelSelector#selectChannels, there are some choices for 
special}} {{BroadcastPartitioner}} implementation.
 # Return any int such as -1 for BroadcastPartitioner, because this value would 
not really be used in {{RecordWriter}} side. We can make it shortcut branch for 
{{BroadcastPartitioner}} in {{RecordWriter}}. This way is easy to handle, but 
may seem a little hacky to return a dummy value in {{selectChannels}} method.
 # Return a `tuple` for {{ChannelSelector#selectChannels}}, and 
the first boolean value indicate whether it is broadcast or not. If broadcast, 
we will ignore the second selected channel index in tuple. I am wondering if it 
would bring additional overhead than the first way above.
 # Define a high level interface called {{ChannelSelectorBase}} with no 
specific methods, then {{ChannelSelector}} extends {{ChannelSelectorBase}} and 
{{BroadcastPartitioner}} implements {{ChannelSelectorBase}} directly, because 
it does not need any methods to return selected channels. In this way, we 
change the current class structure and also need adjust the API in 
{{DataStream}} to reference {{ChannelSelectorBase}} instead of 
{{StreamPartitioner}}. It would be more complex to handle all the related 
references.

Currently, I would prefer the first way although it seems a little hacky, or do 
you have other better suggestions? :)


was (Author: zjwang):
[~pnowojski], I am currently working on this. If we change to return single 
{{int}} for {{ChannelSelector#selectChannels, there are some choices for 
special {{BroadcastPartitioner implementation.
 # Return any int such as -1 for BroadcastPartitioner, because this value would 
not really be used in {{RecordWriter}} side. We can make it shortcut branch for 
{{BroadcastPartitioner}} in {{RecordWriter}}. This way is easy to handle, but 
may seem a little hacky to return a dummy value in {{selectChannels}} method.
 # Return a `tuple` for {{ChannelSelector#selectChannels}}, and 
the first boolean value indicate whether it is broadcast or not. If broadcast, 
we will ignore the second selected channel index in tuple. I am wondering if it 
would bring additional overhead than the first way above.
 # Define a high level interface called {{ChannelSelectorBase}} with no 
specific methods, then {{ChannelSelector}} extends {{ChannelSelectorBase}} and 
{{BroadcastPartitioner}} implements {{ChannelSelectorBase}} directly, because 
it does not need any methods to return selected channels. In this way, we 
change the current class structure and also need adjust the API in 
{{DataStream}} to reference {{ChannelSelectorBase}} instead of 
{{StreamPartitioner}}. It would be more complex to handle all the related 
references.

Currently, I would prefer the first way although it seems a little hacky, or do 
you have other better suggestions? :)

> Refactor the ChannelSelector interface for single selected channel
> --
>
> Key: FLINK-10662
> URL: https://issues.apache.org/jira/browse/FLINK-10662
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> In the discussion of broadcast improvement, [~pnowojski] pointed out the 
> issue of improving the current channel selector.
>  
> In {{ChannelSelector#selectChannels}}, it would return an array for selected 
> channels. But considering specific implementations, only 
> {{BroadcastPartitioner}} would select all the channels, and other 
> implementations will select one channel. So we can simple this interface to 
> return single channel index for benefiting performance, and specialize the 
> {{BroadcastPartitioner}} in a more efficient way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10837) Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?

2018-11-09 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-10837:
---
Component/s: Kafka Connector

> Kafka 2.0 test KafkaITCase.testOneToOneSources dead locked on deleteTestTopic?
> --
>
> Key: FLINK-10837
> URL: https://issues.apache.org/jira/browse/FLINK-10837
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> https://api.travis-ci.org/v3/job/452439034/log.txt
> {noformat}
> Tests in error: 
>   
> KafkaITCase.testOneToOneSources:97->KafkaConsumerTestBase.runOneToOneExactlyOnceTest:875->KafkaTestBase.deleteTestTopic:206->Object.wait:502->Object.wait:-2
>  » TestTimedOut
> {noformat}
> {noformat}
> java.lang.InterruptedException
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.deleteTestTopic(KafkaTestEnvironmentImpl.java:158)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.deleteTestTopic(KafkaTestBase.java:206)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runOneToOneExactlyOnceTest(KafkaConsumerTestBase.java:875)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testOneToOneSources(KafkaITCase.java:97)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10838) Rewrite Kafka tests that fail Kafka brokers

2018-11-09 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-10838:
--

 Summary: Rewrite Kafka tests that fail Kafka brokers
 Key: FLINK-10838
 URL: https://issues.apache.org/jira/browse/FLINK-10838
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.7.0
Reporter: Piotr Nowojski


Currently we have many tests that in order to test for example `at-least-once`, 
they fail Kafka brokers, or brake network connection to them. It seems that 
those tests are more testing Kafka brokers then our own code and in the process 
(because of bugs in Kafka) cause lots of false positive results.

We could try to rewrite those tests to spawn one of the Flink's Task Managers 
in separate process and instead of failing network/kafka broker we could 
SIGKILL Task Manager, without touching/affecting Zookeeper/Kafka brokers. It 
should improve stability of our tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10838) Rewrite Kafka tests that fail Kafka brokers

2018-11-09 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-10838:
---
Description: 
Currently we have many tests that in order to test for example `at-least-once`, 
they fail Kafka brokers, or brake network connection to them. It seems that 
those tests are more testing Kafka brokers then our own code and in the process 
(because of bugs in Kafka) cause lots of false positive results.

We could try to rewrite those tests to spawn one of the Flink's Task Managers 
in separate process and instead of failing network/kafka broker we could 
SIGKILL Task Manager, without touching/affecting Zookeeper/Kafka brokers. It 
should improve stability of our tests.

CC [~till.rohrmann] [~aljoscha]

  was:
Currently we have many tests that in order to test for example `at-least-once`, 
they fail Kafka brokers, or brake network connection to them. It seems that 
those tests are more testing Kafka brokers then our own code and in the process 
(because of bugs in Kafka) cause lots of false positive results.

We could try to rewrite those tests to spawn one of the Flink's Task Managers 
in separate process and instead of failing network/kafka broker we could 
SIGKILL Task Manager, without touching/affecting Zookeeper/Kafka brokers. It 
should improve stability of our tests.


> Rewrite Kafka tests that fail Kafka brokers
> ---
>
> Key: FLINK-10838
> URL: https://issues.apache.org/jira/browse/FLINK-10838
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Currently we have many tests that in order to test for example 
> `at-least-once`, they fail Kafka brokers, or brake network connection to 
> them. It seems that those tests are more testing Kafka brokers then our own 
> code and in the process (because of bugs in Kafka) cause lots of false 
> positive results.
> We could try to rewrite those tests to spawn one of the Flink's Task Managers 
> in separate process and instead of failing network/kafka broker we could 
> SIGKILL Task Manager, without touching/affecting Zookeeper/Kafka brokers. It 
> should improve stability of our tests.
> CC [~till.rohrmann] [~aljoscha]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10838) Rewrite Kafka tests that fail Kafka brokers

2018-11-09 Thread Piotr Nowojski (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-10838:
---
 Priority: Critical  (was: Major)
Fix Version/s: 1.8.0

> Rewrite Kafka tests that fail Kafka brokers
> ---
>
> Key: FLINK-10838
> URL: https://issues.apache.org/jira/browse/FLINK-10838
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Priority: Critical
> Fix For: 1.8.0
>
>
> Currently we have many tests that in order to test for example 
> `at-least-once`, they fail Kafka brokers, or brake network connection to 
> them. It seems that those tests are more testing Kafka brokers then our own 
> code and in the process (because of bugs in Kafka) cause lots of false 
> positive results.
> We could try to rewrite those tests to spawn one of the Flink's Task Managers 
> in separate process and instead of failing network/kafka broker we could 
> SIGKILL Task Manager, without touching/affecting Zookeeper/Kafka brokers. It 
> should improve stability of our tests.
> CC [~till.rohrmann] [~aljoscha]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681228#comment-16681228
 ] 

ASF GitHub Bot commented on FLINK-10624:


pnowojski commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232207878
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   
+   4.0.0
+
+   
flink-sql-client-test-kafka-0.10_${scala.binary.version}
+   flink-sql-client-test-kafka-0.10
+   jar
+
+   
+   
+   org.apache.flink
+   flink-sql-client-test
+   ${project.version}
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   flink-json
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch6_${scala.binary.version}
 
 Review comment:
   Why do we need `elasticsearch` in `flink-sql-client-test-kafka-0.10`? Same 
question for 2.0 connector.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
pnowojski commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232207878
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   
+   4.0.0
+
+   
flink-sql-client-test-kafka-0.10_${scala.binary.version}
+   flink-sql-client-test-kafka-0.10
+   jar
+
+   
+   
+   org.apache.flink
+   flink-sql-client-test
+   ${project.version}
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   flink-json
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch6_${scala.binary.version}
 
 Review comment:
   Why do we need `elasticsearch` in `flink-sql-client-test-kafka-0.10`? Same 
question for 2.0 connector.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681227#comment-16681227
 ] 

ASF GitHub Bot commented on FLINK-10624:


pnowojski commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232207974
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   
+   4.0.0
+
+   
flink-sql-client-test-kafka-0.10_${scala.binary.version}
+   flink-sql-client-test-kafka-0.10
+   jar
+
+   
+   
+   org.apache.flink
+   flink-sql-client-test
+   ${project.version}
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   flink-json
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch6_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+
+   
+   
+   
+   
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
pnowojski commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232207974
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   
+   4.0.0
+
+   
flink-sql-client-test-kafka-0.10_${scala.binary.version}
+   flink-sql-client-test-kafka-0.10
+   jar
+
+   
+   
+   org.apache.flink
+   flink-sql-client-test
+   ${project.version}
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   flink-json
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch6_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+
+   
+   
+   
+   

[jira] [Commented] (FLINK-10834) TableAPI flatten() calculated value error

2018-11-09 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681236#comment-16681236
 ] 

sunjincheng commented on FLINK-10834:
-

Hi,[~fhueske],

Thanks for your comments!

I think the expected steps performed by *FuncRow('a).flatten()* are:
 # FuncRow.eval()  output:  Row(-1189206469, -1189206469, -1189206469)
 # Row(-1189206469, -1189206469, -1189206469).flatten() output : -1189206469, 
-1189206469, -1189206469

Our current implementation logic of *FuncRow('a).flatten()* are:
 #   FuncRow.eval().get(0) outout: -1189206469
 #   FuncRow.eval().get(1) outout: -151367792
 #   FuncRow.eval().get(0) outout: 1988676906

So, There are two problems:
 # For deterministic function execution performance will be poor;
 # Fo non-deterministic function execution result is incorrect;

And I think may be we can fix it by improve our flink implementation logic of 
"*flatten()", May be* can find solutions in two places *expandProjectList and* 
*CodeGenerator#visitFieldAccess.* Try to reuse the rexCall or reuse codegen 
string.

What do you think?

Thanks, Jincheng 

> TableAPI flatten() calculated value error
> -
>
> Key: FLINK-10834
> URL: https://issues.apache.org/jira/browse/FLINK-10834
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.7.1
>
>
> We have a UDF as follows:
> {code:java}
> object FuncRow extends ScalarFunction {
> def eval(v: Int): Row = { 
>   val version = "" + new Random().nextInt()          
>   val row = new Row(3)          
>   row.setField(0, version)          
>   row.setField(1, version)          
>   row.setField(2, version)          
>   row 
> }
> override def isDeterministic: Boolean = false
> override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
>  Types.ROW(Types.STRING, Types.STRING, Types.STRING)
> }
> {code}
> Do the following Query:
> {code:sql}
> val data = new mutable.MutableList[(Int, Long, String)]
>  data.+=((1, 1L, "Hi"))
>  val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b,'c)
>  .select(FuncRow('a).flatten()).as('v1, 'v2, 'v3)
> {code}
> The result is : -1189206469,-151367792,1988676906
> The result expected by the user should be:  v1==v2==v3 .
> It looks the real reason is that there is no result of the reuse in codegen.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ambition119 commented on issue #7067: [FLINK-10674] [table] DistinctAccumulator.remove occur NPE

2018-11-09 Thread GitBox
ambition119 commented on issue #7067: [FLINK-10674] [table] 
DistinctAccumulator.remove occur NPE
URL: https://github.com/apache/flink/pull/7067#issuecomment-437321281
 
 
   Complex Event Processing (CEP) SQL Realization is new pull request, is a new 
feature. Use statement :
   ```sql 
   SELECT [ ALL | DISTINCT ]
   { * | projectItem [, projectItem ]* }
   FROM tableExpression
   [MATCH_RECOGNIZE (
   [PARTITION BY {partitionItem [, partitionItem]*}]
   [ORDER BY {orderItem [, orderItem]*}]
   [MEASURES {measureItem AS col [, measureItem AS col]*}]
   [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT 
ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
   [AFTER MATCH SKIP]
   PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN 
intervalExpression
   DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS 
patternDefinationExpression]*}
   )];
   
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10674) DistinctAccumulator.remove lead to NPE

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681238#comment-16681238
 ] 

ASF GitHub Bot commented on FLINK-10674:


ambition119 commented on issue #7067: [FLINK-10674] [table] 
DistinctAccumulator.remove occur NPE
URL: https://github.com/apache/flink/pull/7067#issuecomment-437321281
 
 
   Complex Event Processing (CEP) SQL Realization is new pull request, is a new 
feature. Use statement :
   ```sql 
   SELECT [ ALL | DISTINCT ]
   { * | projectItem [, projectItem ]* }
   FROM tableExpression
   [MATCH_RECOGNIZE (
   [PARTITION BY {partitionItem [, partitionItem]*}]
   [ORDER BY {orderItem [, orderItem]*}]
   [MEASURES {measureItem AS col [, measureItem AS col]*}]
   [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT 
ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
   [AFTER MATCH SKIP]
   PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN 
intervalExpression
   DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS 
patternDefinationExpression]*}
   )];
   
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DistinctAccumulator.remove lead to NPE
> --
>
> Key: FLINK-10674
> URL: https://issues.apache.org/jira/browse/FLINK-10674
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.1
> Environment: Flink 1.6.0
>Reporter: ambition
>Assignee: winifredtang
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2018-10-25-14-46-03-373.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
> lower(trim(os_type)) as os_type, 
> count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
> !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
> true
>   }else{
> val currentCnt = distinctValueMap.get(params)
> // 
> if (currentCnt == null || currentCnt == 1) {
>   distinctValueMap.remove(params)
>   true
> } else {
>   var value = currentCnt - 1L
>   if(value < 0){
> value = 1
>   }
>   distinctValueMap.put(params, value)
>   false
> }
>   }
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ambition119 edited a comment on issue #7067: [FLINK-10674] [table] DistinctAccumulator.remove occur NPE

2018-11-09 Thread GitBox
ambition119 edited a comment on issue #7067: [FLINK-10674] [table] 
DistinctAccumulator.remove occur NPE
URL: https://github.com/apache/flink/pull/7067#issuecomment-437321281
 
 
   Complex Event Processing (CEP) SQL Realization is new pull request, is a new 
feature. Use statement :
   ```sql 
   SELECT [ ALL | DISTINCT ]
   { * | projectItem [, projectItem ]* }
   FROM tableExpression
   [MATCH_RECOGNIZE (
   [PARTITION BY {partitionItem [, partitionItem]*}]
   [ORDER BY {orderItem [, orderItem]*}]
   [MEASURES {measureItem AS col [, measureItem AS col]*}]
   [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT 
ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
   [AFTER MATCH SKIP]
   PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN 
intervalExpression
   DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS 
patternDefinationExpression]*}
   )];
   
   ```
   I cann't create new pull request, sorry


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10674) DistinctAccumulator.remove lead to NPE

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681242#comment-16681242
 ] 

ASF GitHub Bot commented on FLINK-10674:


ambition119 edited a comment on issue #7067: [FLINK-10674] [table] 
DistinctAccumulator.remove occur NPE
URL: https://github.com/apache/flink/pull/7067#issuecomment-437321281
 
 
   Complex Event Processing (CEP) SQL Realization is new pull request, is a new 
feature. Use statement :
   ```sql 
   SELECT [ ALL | DISTINCT ]
   { * | projectItem [, projectItem ]* }
   FROM tableExpression
   [MATCH_RECOGNIZE (
   [PARTITION BY {partitionItem [, partitionItem]*}]
   [ORDER BY {orderItem [, orderItem]*}]
   [MEASURES {measureItem AS col [, measureItem AS col]*}]
   [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT 
ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
   [AFTER MATCH SKIP]
   PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN 
intervalExpression
   DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS 
patternDefinationExpression]*}
   )];
   
   ```
   I cann't create new pull request, sorry


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DistinctAccumulator.remove lead to NPE
> --
>
> Key: FLINK-10674
> URL: https://issues.apache.org/jira/browse/FLINK-10674
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.1
> Environment: Flink 1.6.0
>Reporter: ambition
>Assignee: winifredtang
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2018-10-25-14-46-03-373.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
> lower(trim(os_type)) as os_type, 
> count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
> !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
> true
>   }else{
> val currentCnt = distinctValueMap.get(params)
> // 
> if (currentCnt == null || currentCnt == 1) {
>   distinctValueMap.remove(params)
>   true
> } else {
>   var value = currentCnt - 1L
>   if(value < 0){
> value = 1
>   }
>   distinctValueMap.put(params, value)
>   false
> }
>   }
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10674) DistinctAccumulator.remove lead to NPE

2018-11-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10674:
---
Labels: pull-request-available  (was: )

> DistinctAccumulator.remove lead to NPE
> --
>
> Key: FLINK-10674
> URL: https://issues.apache.org/jira/browse/FLINK-10674
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.1
> Environment: Flink 1.6.0
>Reporter: ambition
>Assignee: winifredtang
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2018-10-25-14-46-03-373.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
> lower(trim(os_type)) as os_type, 
> count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
> !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
> true
>   }else{
> val currentCnt = distinctValueMap.get(params)
> // 
> if (currentCnt == null || currentCnt == 1) {
>   distinctValueMap.remove(params)
>   true
> } else {
>   var value = currentCnt - 1L
>   if(value < 0){
> value = 1
>   }
>   distinctValueMap.put(params, value)
>   false
> }
>   }
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
twalthr commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232213530
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
 
 Review comment:
   We don't need a Kafka-version specific `pom.xml`. The 
`flink-sql-client-test/pom.xml` is sufficient. It shouldn't be a problem with 
both Kafka versions there. The module makes sure that all transitively required 
Flink modules are build and copies the SQL jars over into a common directory 
using Maven.
   
   The dependency clashes only occur when executing the SQL Client test. This 
means that the test should exclude clashing jars for SQL statement execution. 
This has nothing to do with Maven but is a pure test script thing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681251#comment-16681251
 ] 

ASF GitHub Bot commented on FLINK-10624:


twalthr commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232213530
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
 
 Review comment:
   We don't need a Kafka-version specific `pom.xml`. The 
`flink-sql-client-test/pom.xml` is sufficient. It shouldn't be a problem with 
both Kafka versions there. The module makes sure that all transitively required 
Flink modules are build and copies the SQL jars over into a common directory 
using Maven.
   
   The dependency clashes only occur when executing the SQL Client test. This 
means that the test should exclude clashing jars for SQL statement execution. 
This has nothing to do with Maven but is a pure test script thing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232219046
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
 
 Review comment:
   @twalthr I think distinguishing the old different versions of the kafka 
connector will make it clearer (just like testing the kafka connector 
end-to-end). It is difficult to distinguish dependencies easily in a pom before 
I split it. Currently we use `` to classify, but it 
seems that we can't add two `` elements to a dependency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681275#comment-16681275
 ] 

ASF GitHub Bot commented on FLINK-10624:


yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232219046
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
 
 Review comment:
   @twalthr I think distinguishing the old different versions of the kafka 
connector will make it clearer (just like testing the kafka connector 
end-to-end). It is difficult to distinguish dependencies easily in a pom before 
I split it. Currently we use `` to classify, but it 
seems that we can't add two `` elements to a dependency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232220070
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   
+   4.0.0
+
+   
flink-sql-client-test-kafka-0.10_${scala.binary.version}
+   flink-sql-client-test-kafka-0.10
+   jar
+
+   
+   
+   org.apache.flink
+   flink-sql-client-test
+   ${project.version}
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   flink-json
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch6_${scala.binary.version}
 
 Review comment:
   @pnowojski The `test_sql_client_common.sh` contains elasticsearch connector.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681282#comment-16681282
 ] 

ASF GitHub Bot commented on FLINK-10624:


yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232220070
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   
+   4.0.0
+
+   
flink-sql-client-test-kafka-0.10_${scala.binary.version}
+   flink-sql-client-test-kafka-0.10
+   jar
+
+   
+   
+   org.apache.flink
+   flink-sql-client-test
+   ${project.version}
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   flink-json
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch6_${scala.binary.version}
 
 Review comment:
   @pnowojski The `test_sql_client_common.sh` contains elasticsearch connector.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ambition119 edited a comment on issue #7067: [FLINK-10674] [table] DistinctAccumulator.remove occur NPE

2018-11-09 Thread GitBox
ambition119 edited a comment on issue #7067: [FLINK-10674] [table] 
DistinctAccumulator.remove occur NPE
URL: https://github.com/apache/flink/pull/7067#issuecomment-437321281
 
 
   Complex Event Processing (CEP) SQL Realization  is a new feature. Use 
statement :
   ```sql 
   SELECT [ ALL | DISTINCT ]
   { * | projectItem [, projectItem ]* }
   FROM tableExpression
   [MATCH_RECOGNIZE (
   [PARTITION BY {partitionItem [, partitionItem]*}]
   [ORDER BY {orderItem [, orderItem]*}]
   [MEASURES {measureItem AS col [, measureItem AS col]*}]
   [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT 
ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
   [AFTER MATCH SKIP]
   PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN 
intervalExpression
   DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS 
patternDefinationExpression]*}
   )];
   
   ```
   I cann't create new pull request, sorry


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10674) DistinctAccumulator.remove lead to NPE

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681288#comment-16681288
 ] 

ASF GitHub Bot commented on FLINK-10674:


ambition119 edited a comment on issue #7067: [FLINK-10674] [table] 
DistinctAccumulator.remove occur NPE
URL: https://github.com/apache/flink/pull/7067#issuecomment-437321281
 
 
   Complex Event Processing (CEP) SQL Realization  is a new feature. Use 
statement :
   ```sql 
   SELECT [ ALL | DISTINCT ]
   { * | projectItem [, projectItem ]* }
   FROM tableExpression
   [MATCH_RECOGNIZE (
   [PARTITION BY {partitionItem [, partitionItem]*}]
   [ORDER BY {orderItem [, orderItem]*}]
   [MEASURES {measureItem AS col [, measureItem AS col]*}]
   [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT 
ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
   [AFTER MATCH SKIP]
   PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN 
intervalExpression
   DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS 
patternDefinationExpression]*}
   )];
   
   ```
   I cann't create new pull request, sorry


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DistinctAccumulator.remove lead to NPE
> --
>
> Key: FLINK-10674
> URL: https://issues.apache.org/jira/browse/FLINK-10674
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.1
> Environment: Flink 1.6.0
>Reporter: ambition
>Assignee: winifredtang
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2018-10-25-14-46-03-373.png
>
>
> Our online Flink Job run about a week,job contain sql :
> {code:java}
> select  `time`,  
> lower(trim(os_type)) as os_type, 
> count(distinct feed_id) as feed_total_view  
> from  my_table 
> group by `time`, lower(trim(os_type)){code}
>  
>   then occur NPE: 
>  
> {code:java}
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> org.apache.flink.table.functions.aggfunctions.DistinctAccumulator.remove(DistinctAccumulator.scala:109)
> at NonWindowedAggregationHelper$894.retract(Unknown Source)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:124)
> at 
> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)
> at 
> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyedProcessOperator.java:88)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>  
>  
> View DistinctAccumulator.remove
> !image-2018-10-25-14-46-03-373.png!
>  
> this NPE should currentCnt = null lead to, so we simple handle like :
> {code:java}
> def remove(params: Row): Boolean = {
>   if(!distinctValueMap.contains(params)){
> true
>   }else{
> val currentCnt = distinctValueMap.get(params)
> // 
> if (currentCnt == null || currentCnt == 1) {
>   distinctValueMap.remove(params)
>   true
> } else {
>   var value = currentCnt - 1L
>   if(value < 0){
> value = 1
>   }
>   distinctValueMap.put(params, value)
>   false
> }
>   }
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232221415
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   
+   4.0.0
+
+   
flink-sql-client-test-kafka-0.10_${scala.binary.version}
+   flink-sql-client-test-kafka-0.10
+   jar
+
+   
+   
+   org.apache.flink
+   flink-sql-client-test
+   ${project.version}
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   flink-json
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch6_${scala.binary.version}
 
 Review comment:
   Maybe the module name misleads you. In fact, before this, it was just named 
"flink-sql-client-test". In order to distinguish different versions of kafka 
connector, I included it in the module name, but I did not add elasticsearch, 
which may be misleading. But the introduction of elasticsearch will make it too 
long.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681290#comment-16681290
 ] 

ASF GitHub Bot commented on FLINK-10624:


yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232221415
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.8-SNAPSHOT
+   
+   4.0.0
+
+   
flink-sql-client-test-kafka-0.10_${scala.binary.version}
+   flink-sql-client-test-kafka-0.10
+   jar
+
+   
+   
+   org.apache.flink
+   flink-sql-client-test
+   ${project.version}
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   flink-json
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   sql-jar
+   provided
+   
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch6_${scala.binary.version}
 
 Review comment:
   Maybe the module name misleads you. In fact, before this, it was just named 
"flink-sql-client-test". In order to distinguish different versions of kafka 
connector, I included it in the module name, but I did not add elasticsearch, 
which may be misleading. But the introduction of elasticsearch will make it too 
long.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
twalthr commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232224203
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
 
 Review comment:
   Distinguishing the different versions in the test scripts makes sense but we 
don't need to do it in the Maven module as this Maven module is just 
responsible for collecting jars. Different Kafka versions were in this 
`pom.xml` before without any problems.
   
   What do you mean with two `classifier` elements?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681305#comment-16681305
 ] 

ASF GitHub Bot commented on FLINK-10624:


twalthr commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232224203
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
 
 Review comment:
   Distinguishing the different versions in the test scripts makes sense but we 
don't need to do it in the Maven module as this Maven module is just 
responsible for collecting jars. Different Kafka versions were in this 
`pom.xml` before without any problems.
   
   What do you mean with two `classifier` elements?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys opened a new pull request #7070: [FLINK-10625] Documentation for MATCH_RECOGNIZE clause

2018-11-09 Thread GitBox
dawidwys opened a new pull request #7070: [FLINK-10625] Documentation for 
MATCH_RECOGNIZE clause
URL: https://github.com/apache/flink/pull/7070
 
 
   This PR adds documentation for MATCH_RECOGNIZE clause in SQL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10625) Add MATCH_RECOGNIZE documentation

2018-11-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10625:
---
Labels: pull-request-available  (was: )

> Add MATCH_RECOGNIZE documentation
> -
>
> Key: FLINK-10625
> URL: https://issues.apache.org/jira/browse/FLINK-10625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The newly added {{MATCH_RECOGNIZE}} functionality needs to be documented.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10625) Add MATCH_RECOGNIZE documentation

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681311#comment-16681311
 ] 

ASF GitHub Bot commented on FLINK-10625:


dawidwys opened a new pull request #7070: [FLINK-10625] Documentation for 
MATCH_RECOGNIZE clause
URL: https://github.com/apache/flink/pull/7070
 
 
   This PR adds documentation for MATCH_RECOGNIZE clause in SQL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add MATCH_RECOGNIZE documentation
> -
>
> Key: FLINK-10625
> URL: https://issues.apache.org/jira/browse/FLINK-10625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The newly added {{MATCH_RECOGNIZE}} functionality needs to be documented.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread GitBox
yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232227925
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
 
 Review comment:
   @twalthr This 
[plugin](https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-sql-client-test/pom.xml#L134)
 collect jar based on the `classifier `. 
   
   We should collect different sets of dependencies for different kafka 
connectors. We can't test different kafka connectors but rely on multiple kafka 
connectors, which can cause multiple versions of dependency conflicts and you 
can't guarantee that you are using a 0.10 kafka client when testing kafka 
connector 0.10. There is no guarantee that the Kafka client 2.0 will be tested 
when testing kafka connector 2.0. In fact, it may have a combination of kafka 
connector 2.0 and kafka client 0.10. cc @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10624) Extend SQL client end-to-end to test new KafkaTableSink

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681322#comment-16681322
 ] 

ASF GitHub Bot commented on FLINK-10624:


yanghua commented on a change in pull request #6927: [FLINK-10624] Extend SQL 
client end-to-end to test new KafkaTableSink
URL: https://github.com/apache/flink/pull/6927#discussion_r232227925
 
 

 ##
 File path: flink-end-to-end-tests/flink-sql-client-test-kafka-0.10/pom.xml
 ##
 @@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0";
 
 Review comment:
   @twalthr This 
[plugin](https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-sql-client-test/pom.xml#L134)
 collect jar based on the `classifier `. 
   
   We should collect different sets of dependencies for different kafka 
connectors. We can't test different kafka connectors but rely on multiple kafka 
connectors, which can cause multiple versions of dependency conflicts and you 
can't guarantee that you are using a 0.10 kafka client when testing kafka 
connector 0.10. There is no guarantee that the Kafka client 2.0 will be tested 
when testing kafka connector 2.0. In fact, it may have a combination of kafka 
connector 2.0 and kafka client 0.10. cc @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend SQL client end-to-end to test new KafkaTableSink
> ---
>
> Key: FLINK-10624
> URL: https://issues.apache.org/jira/browse/FLINK-10624
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Table API & SQL, Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9697, we added support for Kafka 2.0. We should also extend the 
> existing streaming client end-to-end test to also test the new 
> {{KafkaTableSink}} against Kafka 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232226981
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MetricMapperTest.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+
+import org.influxdb.dto.Point;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link MetricMapper} checking that metrics are converted to 
InfluxDB client objects as expected.
+ */
+public class MetricMapperTest {
+
+   private final String name = "a-metric-name";
+   private final MeasurementInfo info = getMeasurementInfo(name);
+   private final Instant timestamp = Instant.now();
+
+   @Test
+   public void testMapGauge() {
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42),
+   "value=42");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
null),
+   "value=null");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
"hello"),
+   "value=hello");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42L),
+   "value=42");
+   }
+
+   @Test
+   public void testMapCounter() {
+   Counter counter = mock(Counter.class);
+   when(counter.getCount()).thenReturn(42L);
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, counter),
+   "count=42");
+   }
+
+   @Test
+   public void testMapHistogram() {
+   HistogramStatistics statistics = 
mock(HistogramStatistics.class);
 
 Review comment:
   replace mock with actual implementation


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232227784
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.containing;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test for {@link InfluxdbReporter}.
+ */
+public class InfluxdbReporterTest {
+   private static final String TEST_INFLUXDB_DB = "test-42";
+   private static final String METRIC_HOSTNAME = "task-mgr-1";
+   private static final String METRIC_TM_ID = "tm-id-123";
+
+   @Rule
+   public WireMockRule wireMockRule = new 
WireMockRule(wireMockConfig().dynamicPort().notifier(new 
ConsoleNotifier(false)));
 
 Review comment:
   can be final


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232224560
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporter.java
 ##
 @@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.util.ConcurrentModificationException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.DB;
+import static org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.HOST;
+import static 
org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.PASSWORD;
+import static org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.PORT;
+import static 
org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.USERNAME;
+import static 
org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.getInteger;
+import static 
org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.getString;
+
+/**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via InfluxDB.
+ */
+@Experimental
+public class InfluxdbReporter extends AbstractReporter 
implements Scheduled {
+
+   private String database;
+   private InfluxDB influxDB;
+
+   public InfluxdbReporter() {
+   super(new MeasurementInfoProvider());
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   String host = getString(config, HOST);
+   int port = getInteger(config, PORT);
+   if (host == null || host.isEmpty() || port < 1) {
 
 Review comment:
   also check upper bound for port


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10839) Fix implementation of PojoSerializer.duplicate() w.r.t. subclass serializer

2018-11-09 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-10839:
--

 Summary: Fix implementation of PojoSerializer.duplicate() w.r.t. 
subclass serializer
 Key: FLINK-10839
 URL: https://issues.apache.org/jira/browse/FLINK-10839
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Stefan Richter
Assignee: Stefan Richter
 Fix For: 1.6.3, 1.7.0


The implementation of {{PojoSerializer.duplicate()}} currently ignores that 
subclassSerializerCache is a shared resource. We must always return a new 
instance of the serializer, even if all the nested field serializers are not 
stateful or else concurrency problems might occur if we have cached subclass 
serializers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232226619
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java
 ##
 @@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.util.AbstractID;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyChar;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link MeasurementInfoProvider}.
+ */
+public class MeasurementInfoProviderTest {
+   private static final Random RANDOM = new Random();
+
+   private final MeasurementInfoProvider provider = new 
MeasurementInfoProvider();
+
+   @Test
+   public void testGetMetricInfo() {
+   // MetricRegistry, required as the first argument in metric 
groups.
+   MetricRegistryImpl metricRegistry = new MetricRegistryImpl(
+   MetricRegistryConfiguration.fromConfiguration(new 
Configuration()));
+
+   // Create an example, nested metric group: taskmanager -> job 
-> task
+   // Variables: , 
+   String hostname = "loc<>al\"::host\".:";
+   String taskManagerId = "tas:kMana::ger";
+   TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(
+   metricRegistry, hostname, taskManagerId);
+
+   // Variables: , 
+   JobID jobID = new JobID();
+   String jobName = "testJ\"ob:-!ax..?";
+   TaskManagerJobMetricGroup tmJobMetricGroup = new 
TaskManagerJobMetricGroup(
+   metricRegistry, tmMetricGroup, jobID, jobName);
+
+   // Variables: , , , 
, 
+   JobVertexID taskId = new JobVertexID();
+   AbstractID taskAttemptID = new AbstractID();
+   String taskName = "test\"Ta\"..sk";
+   int subtaskIndex = RANDOM.nextInt();
+   int taskAttemptNum = RANDOM.nextInt();
+   TaskMetricGroup taskMetricGroup = new TaskMetricGroup(
+   metricRegistry, tmJobMetricGroup, taskId, 
taskAttemptID, taskName, subtaskIndex, taskAttemptNum);
+
+   String metricName = "testCounter";
+   MetricGroup metricGroup = new FrontMetricGroup<>(0, 
taskMetricGroup);
+
+   MeasurementInfo info = provider.getMetricInfo(metricName, 
metricGroup);
+   assertNotNull(info);
+   assertEquals(
+   String.join("" + 
MeasurementInfoProvider.SCOPE_SEPARATOR, "taskmanager", "job", "task", 
metricName),
+   info.getName());
+   assertThat(info.getTags(), hasEntry("host", hostname));
+   assertThat(info.getTags(), hasEntry("tm_id", taskManagerId));
+   assertThat(info.getTags(), hasEntry("job_id", 
jobID.toString()));
+   assertThat(info.getTags(), hasEntry("job_name", jobName));
+   assertThat(info.getTags(), hasEntry("task_id"

[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232225906
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/MetricMapper.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+
+import org.influxdb.dto.Point;
+
+import java.time.Instant;
+import java.util.concurrent.TimeUnit;
+
+class MetricMapper {
+
+   static Point map(MeasurementInfo info, Instant timestamp, Gauge 
gauge) {
+   Point.Builder builder = builder(info, timestamp);
+   Object value = gauge.getValue();
+   if (value instanceof Number) {
+   builder.addField("value", (Number) value);
+   } else {
+   builder.addField("value", String.valueOf(value));
+   }
+   return builder.build();
+   }
+
+   static Point map(MeasurementInfo info, Instant timestamp, Counter 
counter) {
+   return builder(info, timestamp)
+   .addField("count", counter.getCount())
+   .build();
+   }
+
+   static Point map(MeasurementInfo info, Instant timestamp, Histogram 
histogram) {
+   HistogramStatistics statistics = histogram.getStatistics();
+   return builder(info, timestamp)
+   .addField("run-count", histogram.getCount())
 
 Review comment:
   let's keep the names in sync with the StatsD/Slf4j reporter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232228849
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.containing;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test for {@link InfluxdbReporter}.
+ */
+public class InfluxdbReporterTest {
+   private static final String TEST_INFLUXDB_DB = "test-42";
+   private static final String METRIC_HOSTNAME = "task-mgr-1";
+   private static final String METRIC_TM_ID = "tm-id-123";
+
+   @Rule
+   public WireMockRule wireMockRule = new 
WireMockRule(wireMockConfig().dynamicPort().notifier(new 
ConsoleNotifier(false)));
+
+   @Test
+   public void test() throws Exception {
+   String configPrefix = ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.";
+   Configuration configuration = new Configuration();
+   configuration.setString(
+   configPrefix + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+   InfluxdbReporter.class.getTypeName());
+   configuration.setString(configPrefix + "host", "localhost");
+   configuration.setString(configPrefix + "port", 
String.valueOf(wireMockRule.port()));
+   configuration.setString(configPrefix + "db", TEST_INFLUXDB_DB);
+
+   MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
+   MetricReporter reporter = metricRegistry.getReporters().get(0);
+   assertTrue(reporter instanceof InfluxdbReporter);
+   InfluxdbReporter influxdbReporter = (InfluxdbReporter) reporter;
+
+   String metricName = "TestCounter";
+   Counter counter = registerTestMetric(metricName, 
metricRegistry);
+   MeasurementInfo measurementInfo = 
influxdbReporter.counters.get(counter);
+   assertNotNull("test metric must be registered in the reporter", 
measurementInfo);
+   String fullMetricName = "taskmanager_" + metricName;
+   assertEquals(fullMetricName, measurementInfo.getName());
+   assertThat(measurementInfo.getTags(), hasEntry("host", 
METRIC_HOSTNAME));
+   assertThat(measurementInfo.getTags(), hasEntry("tm_id", 
METRIC_TM_ID));
+
+   stubFor(post(urlPath

[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232228632
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.containing;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test for {@link InfluxdbReporter}.
+ */
+public class InfluxdbReporterTest {
+   private static final String TEST_INFLUXDB_DB = "test-42";
+   private static final String METRIC_HOSTNAME = "task-mgr-1";
+   private static final String METRIC_TM_ID = "tm-id-123";
+
+   @Rule
+   public WireMockRule wireMockRule = new 
WireMockRule(wireMockConfig().dynamicPort().notifier(new 
ConsoleNotifier(false)));
+
+   @Test
+   public void test() throws Exception {
+   String configPrefix = ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.";
+   Configuration configuration = new Configuration();
+   configuration.setString(
+   configPrefix + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+   InfluxdbReporter.class.getTypeName());
+   configuration.setString(configPrefix + "host", "localhost");
+   configuration.setString(configPrefix + "port", 
String.valueOf(wireMockRule.port()));
+   configuration.setString(configPrefix + "db", TEST_INFLUXDB_DB);
+
+   MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
+   MetricReporter reporter = metricRegistry.getReporters().get(0);
+   assertTrue(reporter instanceof InfluxdbReporter);
+   InfluxdbReporter influxdbReporter = (InfluxdbReporter) reporter;
+
+   String metricName = "TestCounter";
+   Counter counter = registerTestMetric(metricName, 
metricRegistry);
+   MeasurementInfo measurementInfo = 
influxdbReporter.counters.get(counter);
+   assertNotNull("test metric must be registered in the reporter", 
measurementInfo);
+   String fullMetricName = "taskmanager_" + metricName;
+   assertEquals(fullMetricName, measurementInfo.getName());
+   assertThat(measurementInfo.getTags(), hasEntry("host", 
METRIC_HOSTNAME));
 
 Review comment:
   this seems unnecessary; the wiremock check would catch this.

---

[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232228781
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.containing;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test for {@link InfluxdbReporter}.
+ */
+public class InfluxdbReporterTest {
+   private static final String TEST_INFLUXDB_DB = "test-42";
+   private static final String METRIC_HOSTNAME = "task-mgr-1";
+   private static final String METRIC_TM_ID = "tm-id-123";
+
+   @Rule
+   public WireMockRule wireMockRule = new 
WireMockRule(wireMockConfig().dynamicPort().notifier(new 
ConsoleNotifier(false)));
+
+   @Test
+   public void test() throws Exception {
+   String configPrefix = ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.";
+   Configuration configuration = new Configuration();
+   configuration.setString(
+   configPrefix + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+   InfluxdbReporter.class.getTypeName());
+   configuration.setString(configPrefix + "host", "localhost");
+   configuration.setString(configPrefix + "port", 
String.valueOf(wireMockRule.port()));
+   configuration.setString(configPrefix + "db", TEST_INFLUXDB_DB);
+
+   MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
+   MetricReporter reporter = metricRegistry.getReporters().get(0);
+   assertTrue(reporter instanceof InfluxdbReporter);
+   InfluxdbReporter influxdbReporter = (InfluxdbReporter) reporter;
+
+   String metricName = "TestCounter";
+   Counter counter = registerTestMetric(metricName, 
metricRegistry);
+   MeasurementInfo measurementInfo = 
influxdbReporter.counters.get(counter);
+   assertNotNull("test metric must be registered in the reporter", 
measurementInfo);
+   String fullMetricName = "taskmanager_" + metricName;
+   assertEquals(fullMetricName, measurementInfo.getName());
+   assertThat(measurementInfo.getTags(), hasEntry("host", 
METRIC_HOSTNAME));
+   assertThat(measurementInfo.getTags(), hasEntry("tm_id", 
METRIC_TM_ID));
+
+   stubFor(post(urlPath

[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232227842
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.containing;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test for {@link InfluxdbReporter}.
+ */
+public class InfluxdbReporterTest {
 
 Review comment:
   should extend `TestLogger` (also applies to other tests)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232228289
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/InfluxdbReporterTest.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.containing;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test for {@link InfluxdbReporter}.
+ */
+public class InfluxdbReporterTest {
+   private static final String TEST_INFLUXDB_DB = "test-42";
+   private static final String METRIC_HOSTNAME = "task-mgr-1";
+   private static final String METRIC_TM_ID = "tm-id-123";
+
+   @Rule
+   public WireMockRule wireMockRule = new 
WireMockRule(wireMockConfig().dynamicPort().notifier(new 
ConsoleNotifier(false)));
+
+   @Test
+   public void test() throws Exception {
+   String configPrefix = ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.";
+   Configuration configuration = new Configuration();
+   configuration.setString(
+   configPrefix + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
+   InfluxdbReporter.class.getTypeName());
+   configuration.setString(configPrefix + "host", "localhost");
+   configuration.setString(configPrefix + "port", 
String.valueOf(wireMockRule.port()));
+   configuration.setString(configPrefix + "db", TEST_INFLUXDB_DB);
+
+   MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
+   MetricReporter reporter = metricRegistry.getReporters().get(0);
+   assertTrue(reporter instanceof InfluxdbReporter);
+   InfluxdbReporter influxdbReporter = (InfluxdbReporter) reporter;
+
+   String metricName = "TestCounter";
+   Counter counter = registerTestMetric(metricName, 
metricRegistry);
+   MeasurementInfo measurementInfo = 
influxdbReporter.counters.get(counter);
+   assertNotNull("test metric must be registered in the reporter", 
measurementInfo);
+   String fullMetricName = "taskmanager_" + metricName;
+   assertEquals(fullMetricName, measurementInfo.getName());
+   assertThat(measurementInfo.getTags(), hasEntry("host", 
METRIC_HOSTNAME));
+   assertThat(measurementInfo.getTags(), hasEntry("tm_id", 
METRIC_TM_ID));
+
+   stubFor(post(urlPath

[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232227048
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MetricMapperTest.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+
+import org.influxdb.dto.Point;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link MetricMapper} checking that metrics are converted to 
InfluxDB client objects as expected.
+ */
+public class MetricMapperTest {
+
+   private final String name = "a-metric-name";
+   private final MeasurementInfo info = getMeasurementInfo(name);
+   private final Instant timestamp = Instant.now();
+
+   @Test
+   public void testMapGauge() {
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42),
+   "value=42");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
null),
+   "value=null");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
"hello"),
+   "value=hello");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42L),
+   "value=42");
+   }
+
+   @Test
+   public void testMapCounter() {
+   Counter counter = mock(Counter.class);
+   when(counter.getCount()).thenReturn(42L);
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, counter),
+   "count=42");
+   }
+
+   @Test
+   public void testMapHistogram() {
+   HistogramStatistics statistics = 
mock(HistogramStatistics.class);
+   when(statistics.getMax()).thenReturn(-5L);
+   when(statistics.getMin()).thenReturn(50L);
+   when(statistics.getMean()).thenReturn(1.2);
+   when(statistics.getStdDev()).thenReturn(0.7);
+   when(statistics.getQuantile(.5)).thenReturn(1.0);
+   when(statistics.getQuantile(.75)).thenReturn(2.0);
+   when(statistics.getQuantile(.95)).thenReturn(3.0);
+   when(statistics.getQuantile(.98)).thenReturn(4.0);
+   when(statistics.getQuantile(.99)).thenReturn(5.0);
+   when(statistics.getQuantile(.999)).thenReturn(6.0);
+
+   Histogram histogram = mock(Histogram.class);
 
 Review comment:
   replace mock wit actual implementation


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232226784
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MeasurementInfoProviderTest.java
 ##
 @@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.util.AbstractID;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyChar;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link MeasurementInfoProvider}.
+ */
+public class MeasurementInfoProviderTest {
+   private static final Random RANDOM = new Random();
+
+   private final MeasurementInfoProvider provider = new 
MeasurementInfoProvider();
+
+   @Test
+   public void testGetMetricInfo() {
 
 Review comment:
   this test seems unnecessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232227097
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MetricMapperTest.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+
+import org.influxdb.dto.Point;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link MetricMapper} checking that metrics are converted to 
InfluxDB client objects as expected.
+ */
+public class MetricMapperTest {
+
+   private final String name = "a-metric-name";
+   private final MeasurementInfo info = getMeasurementInfo(name);
+   private final Instant timestamp = Instant.now();
+
+   @Test
+   public void testMapGauge() {
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42),
+   "value=42");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
null),
+   "value=null");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
"hello"),
+   "value=hello");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42L),
+   "value=42");
+   }
+
+   @Test
+   public void testMapCounter() {
+   Counter counter = mock(Counter.class);
+   when(counter.getCount()).thenReturn(42L);
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, counter),
+   "count=42");
+   }
+
+   @Test
+   public void testMapHistogram() {
+   HistogramStatistics statistics = 
mock(HistogramStatistics.class);
+   when(statistics.getMax()).thenReturn(-5L);
+   when(statistics.getMin()).thenReturn(50L);
+   when(statistics.getMean()).thenReturn(1.2);
+   when(statistics.getStdDev()).thenReturn(0.7);
+   when(statistics.getQuantile(.5)).thenReturn(1.0);
+   when(statistics.getQuantile(.75)).thenReturn(2.0);
+   when(statistics.getQuantile(.95)).thenReturn(3.0);
+   when(statistics.getQuantile(.98)).thenReturn(4.0);
+   when(statistics.getQuantile(.99)).thenReturn(5.0);
+   when(statistics.getQuantile(.999)).thenReturn(6.0);
+
+   Histogram histogram = mock(Histogram.class);
+   when(histogram.getStatistics()).thenReturn(statistics);
+   when(histogram.getCount()).thenReturn(42L);
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, histogram),
+   "50-percentile=1.0",
+   "75-percentile=2.0",
+   "95-percentile=3.0",
+   "98-percentile=4.0",
+   "99-percentile=5.0",
+   "999-percentile=6.0",
+   "count=0",
+   "max=-5",
+   "mean=1.2",
+   "min=50",
+   "run-count=42",
+   "std-dev=0.7");
+   }
+
+   @Test
+   public void testMapMeter() {
+   Meter meter = mock(Meter.class);
 
 Review comment:
   replace mock wit actual implementation


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries a

[GitHub] zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add new metrics reporter to InfluxDB

2018-11-09 Thread GitBox
zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232226938
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MetricMapperTest.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+
+import org.influxdb.dto.Point;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link MetricMapper} checking that metrics are converted to 
InfluxDB client objects as expected.
+ */
+public class MetricMapperTest {
+
+   private final String name = "a-metric-name";
+   private final MeasurementInfo info = getMeasurementInfo(name);
+   private final Instant timestamp = Instant.now();
+
+   @Test
+   public void testMapGauge() {
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42),
+   "value=42");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
null),
+   "value=null");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
"hello"),
+   "value=hello");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42L),
+   "value=42");
+   }
+
+   @Test
+   public void testMapCounter() {
+   Counter counter = mock(Counter.class);
 
 Review comment:
   Use a `SimpleCounter` and set value manually to 42.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7155) Add Influxdb metrics reporter

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681327#comment-16681327
 ] 

ASF GitHub Bot commented on FLINK-7155:
---

zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232224560
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/main/java/org/apache/flink/metrics/influxdb/InfluxdbReporter.java
 ##
 @@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.util.ConcurrentModificationException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.DB;
+import static org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.HOST;
+import static 
org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.PASSWORD;
+import static org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.PORT;
+import static 
org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.USERNAME;
+import static 
org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.getInteger;
+import static 
org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.getString;
+
+/**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via InfluxDB.
+ */
+@Experimental
+public class InfluxdbReporter extends AbstractReporter 
implements Scheduled {
+
+   private String database;
+   private InfluxDB influxDB;
+
+   public InfluxdbReporter() {
+   super(new MeasurementInfoProvider());
+   }
+
+   @Override
+   public void open(MetricConfig config) {
+   String host = getString(config, HOST);
+   int port = getInteger(config, PORT);
+   if (host == null || host.isEmpty() || port < 1) {
 
 Review comment:
   also check upper bound for port


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Influxdb metrics reporter
> -
>
> Key: FLINK-7155
> URL: https://issues.apache.org/jira/browse/FLINK-7155
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Patrick Lucas
>Assignee: Patrick Lucas
>Priority: Major
>  Labels: pull-request-available
>
> [~jgrier] has a [simple Influxdb metrics reporter for 
> Flink|https://github.com/jgrier/flink-stuff/tree/master/flink-influx-reporter]
>  that is a thing wrapper around [a lightweight, public-domain Influxdb 
> reporter|https://github.com/davidB/metrics-influxdb] for Codahale metrics.
> We can implement this very easily in Java in the same as as 
> flink-metrics-graphite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7155) Add Influxdb metrics reporter

2018-11-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681331#comment-16681331
 ] 

ASF GitHub Bot commented on FLINK-7155:
---

zentol commented on a change in pull request #6976: [FLINK-7155][metrics] Add 
new metrics reporter to InfluxDB
URL: https://github.com/apache/flink/pull/6976#discussion_r232226938
 
 

 ##
 File path: 
flink-metrics/flink-metrics-influxdb/src/test/java/org/apache/flink/metrics/influxdb/MetricMapperTest.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.influxdb;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+
+import org.influxdb.dto.Point;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link MetricMapper} checking that metrics are converted to 
InfluxDB client objects as expected.
+ */
+public class MetricMapperTest {
+
+   private final String name = "a-metric-name";
+   private final MeasurementInfo info = getMeasurementInfo(name);
+   private final Instant timestamp = Instant.now();
+
+   @Test
+   public void testMapGauge() {
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42),
+   "value=42");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
null),
+   "value=null");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
"hello"),
+   "value=hello");
+
+   verifyPoint(
+   MetricMapper.map(info, timestamp, (Gauge) () -> 
42L),
+   "value=42");
+   }
+
+   @Test
+   public void testMapCounter() {
+   Counter counter = mock(Counter.class);
 
 Review comment:
   Use a `SimpleCounter` and set value manually to 42.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Influxdb metrics reporter
> -
>
> Key: FLINK-7155
> URL: https://issues.apache.org/jira/browse/FLINK-7155
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Patrick Lucas
>Assignee: Patrick Lucas
>Priority: Major
>  Labels: pull-request-available
>
> [~jgrier] has a [simple Influxdb metrics reporter for 
> Flink|https://github.com/jgrier/flink-stuff/tree/master/flink-influx-reporter]
>  that is a thing wrapper around [a lightweight, public-domain Influxdb 
> reporter|https://github.com/davidB/metrics-influxdb] for Codahale metrics.
> We can implement this very easily in Java in the same as as 
> flink-metrics-graphite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >