[jira] [Created] (FLINK-10685) Support history server on YARN

2018-10-25 Thread vinoyang (JIRA)
vinoyang created FLINK-10685:


 Summary: Support history server on YARN
 Key: FLINK-10685
 URL: https://issues.apache.org/jira/browse/FLINK-10685
 Project: Flink
  Issue Type: Improvement
  Components: YARN
Reporter: vinoyang
Assignee: vinoyang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10680) Unable to set negative offsets for TumblingEventTimeWindow

2018-10-25 Thread Paul Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Lin updated FLINK-10680:
-
Labels: pull-request-available  (was: )

> Unable to set negative offsets for TumblingEventTimeWindow
> --
>
> Key: FLINK-10680
> URL: https://issues.apache.org/jira/browse/FLINK-10680
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.6.1
>Reporter: Paul Lin
>Priority: Major
>  Labels: pull-request-available
>
> The following code given in documentation throws an IllegalArgumentException: 
> TumblingEventTimeWindows parameters must satisfy 0 <= offset < size.
> > TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8));
> By design, the offset could be negative to fit in different time zones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10684) Improve the CSV reading process

2018-10-25 Thread Xingcan Cui (JIRA)
Xingcan Cui created FLINK-10684:
---

 Summary: Improve the CSV reading process
 Key: FLINK-10684
 URL: https://issues.apache.org/jira/browse/FLINK-10684
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Xingcan Cui


CSV is one of the most commonly used file formats in data wrangling. To load 
records from CSV files, Flink has provided the basic {{CsvInputFormat}}, as 
well as some variants (e.g., {{RowCsvInputFormat}} and {{PojoCsvInputFormat}}). 
However, it seems that the reading process can be improved. For example, we 
could add a built-in util to automatically infer schemas from CSV headers and 
samples of data. Also, the current bad record handling method can be improved 
by somehow keeping the invalid lines (and even the reasons for failed parsing), 
instead of logging the total number only.

This is an umbrella issue for all the improvements and bug fixes for the CSV 
reading process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10683) Error while executing BLOB connection. java.io.IOException: Unknown operation

2018-10-25 Thread Yee (JIRA)
Yee created FLINK-10683:
---

 Summary: Error while executing BLOB connection. 
java.io.IOException: Unknown operation
 Key: FLINK-10683
 URL: https://issues.apache.org/jira/browse/FLINK-10683
 Project: Flink
  Issue Type: Bug
Reporter: Yee


ERROR org.apache.flink.runtime.blob.BlobServerConnection- Error 
while executing BLOB connection.
java.io.IOException: Unknown operation 5
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:136)
2018-10-26 01:49:35,247 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- Error while 
executing BLOB connection.
java.io.IOException: Unknown operation 18
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:136)
2018-10-26 01:49:35,550 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- PUT operation 
failed
java.io.IOException: Unknown type of BLOB addressing.
at 
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:347)
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:127)
2018-10-26 01:49:35,854 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- Error while 
executing BLOB connection.
java.io.IOException: Unknown operation 3
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:136)
2018-10-26 01:49:36,159 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- PUT operation 
failed
java.io.IOException: Unexpected number of incoming bytes: 50353152
at 
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:368)
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:127)
2018-10-26 01:49:36,463 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- Error while 
executing BLOB connection.
java.io.IOException: Unknown operation 105
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:136)
2018-10-26 01:49:36,765 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- Error while 
executing BLOB connection.
java.io.IOException: Unknown operation 71
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:136)
2018-10-26 01:49:37,069 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- Error while 
executing BLOB connection.
java.io.IOException: Unknown operation 128
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:136)
2018-10-26 01:49:37,373 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- PUT operation 
failed
java.io.IOException: Unexpected number of incoming bytes: 4302592
at 
org.apache.flink.runtime.blob.BlobServerConnection.put(BlobServerConnection.java:368)
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:127)
2018-10-26 01:49:37,676 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- Error while 
executing BLOB connection.
java.io.IOException: Unknown operation 115
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:136)
2018-10-26 01:49:37,980 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection- Error while 
executing BLOB connection.
java.io.IOException: Unknown operation 71
at 
org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:136)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10680) Unable to set negative offsets for TumblingEventTimeWindow

2018-10-25 Thread Paul Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664559#comment-16664559
 ] 

Paul Lin commented on FLINK-10680:
--

[~hequn8128] Thanks for the pointer. I looked into the commit logs and found 
this parameter check is added in a refactoring of tests 
[FLINK-4994|https://issues.apache.org/jira/browse/FLINK-4994] which has no 
description of the offset API changes, so I think it is more likely to be a bug 
rather than an expected change.

> Unable to set negative offsets for TumblingEventTimeWindow
> --
>
> Key: FLINK-10680
> URL: https://issues.apache.org/jira/browse/FLINK-10680
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.6.1
>Reporter: Paul Lin
>Priority: Major
>
> The following code given in documentation throws an IllegalArgumentException: 
> TumblingEventTimeWindows parameters must satisfy 0 <= offset < size.
> > TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8));
> By design, the offset could be negative to fit in different time zones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10680) Unable to set negative offsets for TumblingEventTimeWindow

2018-10-25 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664527#comment-16664527
 ] 

Hequn Cheng commented on FLINK-10680:
-

[~Paul Lin] Hi, thanks for your contribution. I think we have to correct the 
document rather than change the code. The window start of both `offset -8` and 
`offset 16` would be same. This problem has been discussed 
[here|https://issues.apache.org/jira/browse/FLINK-10284] FLINK-10284.

> Unable to set negative offsets for TumblingEventTimeWindow
> --
>
> Key: FLINK-10680
> URL: https://issues.apache.org/jira/browse/FLINK-10680
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.6.1
>Reporter: Paul Lin
>Priority: Major
>
> The following code given in documentation throws an IllegalArgumentException: 
> TumblingEventTimeWindows parameters must satisfy 0 <= offset < size.
> > TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8));
> By design, the offset could be negative to fit in different time zones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10680) Unable to set negative offsets for TumblingEventTimeWindow

2018-10-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664519#comment-16664519
 ] 

vinoyang commented on FLINK-10680:
--

[~Paul Lin] OK, I can help review it when I have time. I thought you just 
wanted to report this issue.

> Unable to set negative offsets for TumblingEventTimeWindow
> --
>
> Key: FLINK-10680
> URL: https://issues.apache.org/jira/browse/FLINK-10680
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.6.1
>Reporter: Paul Lin
>Priority: Major
>
> The following code given in documentation throws an IllegalArgumentException: 
> TumblingEventTimeWindows parameters must satisfy 0 <= offset < size.
> > TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8));
> By design, the offset could be negative to fit in different time zones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10680) Unable to set negative offsets for TumblingEventTimeWindow

2018-10-25 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10680:


Assignee: (was: vinoyang)

> Unable to set negative offsets for TumblingEventTimeWindow
> --
>
> Key: FLINK-10680
> URL: https://issues.apache.org/jira/browse/FLINK-10680
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.6.1
>Reporter: Paul Lin
>Priority: Major
>
> The following code given in documentation throws an IllegalArgumentException: 
> TumblingEventTimeWindows parameters must satisfy 0 <= offset < size.
> > TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8));
> By design, the offset could be negative to fit in different time zones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10680) Unable to set negative offsets for TumblingEventTimeWindow

2018-10-25 Thread Paul Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664508#comment-16664508
 ] 

Paul Lin commented on FLINK-10680:
--

Hi [~yanghua], I've created a PR for this, would you please have a look? 

https://github.com/apache/flink/pull/6932

> Unable to set negative offsets for TumblingEventTimeWindow
> --
>
> Key: FLINK-10680
> URL: https://issues.apache.org/jira/browse/FLINK-10680
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.6.1
>Reporter: Paul Lin
>Assignee: vinoyang
>Priority: Major
>
> The following code given in documentation throws an IllegalArgumentException: 
> TumblingEventTimeWindows parameters must satisfy 0 <= offset < size.
> > TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8));
> By design, the offset could be negative to fit in different time zones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] link3280 opened a new pull request #6932: [Flink-10680][streaming] Fix IllegalArgumentException caused by negative offset in TumblingWindows

2018-10-25 Thread GitBox
link3280 opened a new pull request #6932: [Flink-10680][streaming] Fix 
IllegalArgumentException caused by negative offset in TumblingWindows
URL: https://github.com/apache/flink/pull/6932
 
 
   ## What is the purpose of the change
   
   By design, the offset could be either positive or negative to fit in 
different time zones, but there's a check that ensures the offset is greater 
than zero. 
   
   This PR fixes the IllegalArguementException caused by negative offsets in 
TumblingEventTimeWindows and TumblingProcessingTimeWindows.
   
   ## Brief change log
   
   - Change the condition of the parameter checks in TumblingEventTimeWindows 
and TumblingProcessingTimeWindows from `0 <= offset < size` to `0 <= 
abs(offset) < size`.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Updated the tests for invalid parameters for time window assigners.
   - Added tests for negative offsets of time windows.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no 
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10680) Unable to set negative offsets for TumblingEventTimeWindow

2018-10-25 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10680:


Assignee: vinoyang

> Unable to set negative offsets for TumblingEventTimeWindow
> --
>
> Key: FLINK-10680
> URL: https://issues.apache.org/jira/browse/FLINK-10680
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.6.1
>Reporter: Paul Lin
>Assignee: vinoyang
>Priority: Major
>
> The following code given in documentation throws an IllegalArgumentException: 
> TumblingEventTimeWindows parameters must satisfy 0 <= offset < size.
> > TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8));
> By design, the offset could be negative to fit in different time zones.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-10276) Job Manager and Task Manager Metrics Reporter Ports Configuration

2018-10-25 Thread Stavros Kontopoulos (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stavros Kontopoulos updated FLINK-10276:

Comment: was deleted

(was: This might be a problem on K8s too, which does not cover port ranges: 
[https://github.com/prometheus/prometheus/issues/3756])

> Job Manager and Task Manager Metrics Reporter Ports Configuration
> -
>
> Key: FLINK-10276
> URL: https://issues.apache.org/jira/browse/FLINK-10276
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Deirdre Kong
>Priority: Major
>
> *Problem Statement:*
> When deploying Flink using YARN, the job manager and task manager can be on 
> the same node or different nodes.  Say I specify the port range to be 
> 9249-9250, if JM and TM are deployed on the same node, the port for JM will 
> be 9249 and the port for TM will be 9250.  If JM and TM are deployed on 
> different nodes, then the ports for JM and TM will be 9249.
> I can only configure Prometheus once for the ports to scrape JM and TMs 
> metrics.  In this case, I won't know whether port 9249 is for JM or TM.  If 
> would be great if we can specify in flink-conf.yaml on the port we want for 
> JM reporter and TMs reporter.
> *Comment from Till:*
> I think we could extend Vino's proposal for Yarn as well: Maybe it makes 
> sense to allow to override certain configuration settings for the 
> TaskManagers when deploying on Yarn. That way one could define a fixed port 
> for the JM and a port range for the TMs. Having such a distinction you can 
> configure your Prometheus to scrape for the single JM and the TMs 
> individually. However, Flink does not yet support such a feature. You can 
> open a JIRA issue to track the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10276) Job Manager and Task Manager Metrics Reporter Ports Configuration

2018-10-25 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664420#comment-16664420
 ] 

Stavros Kontopoulos commented on FLINK-10276:
-

This might be a problem on K8s two, which does not cover port ranges: 
https://github.com/prometheus/prometheus/issues/3756

> Job Manager and Task Manager Metrics Reporter Ports Configuration
> -
>
> Key: FLINK-10276
> URL: https://issues.apache.org/jira/browse/FLINK-10276
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Deirdre Kong
>Priority: Major
>
> *Problem Statement:*
> When deploying Flink using YARN, the job manager and task manager can be on 
> the same node or different nodes.  Say I specify the port range to be 
> 9249-9250, if JM and TM are deployed on the same node, the port for JM will 
> be 9249 and the port for TM will be 9250.  If JM and TM are deployed on 
> different nodes, then the ports for JM and TM will be 9249.
> I can only configure Prometheus once for the ports to scrape JM and TMs 
> metrics.  In this case, I won't know whether port 9249 is for JM or TM.  If 
> would be great if we can specify in flink-conf.yaml on the port we want for 
> JM reporter and TMs reporter.
> *Comment from Till:*
> I think we could extend Vino's proposal for Yarn as well: Maybe it makes 
> sense to allow to override certain configuration settings for the 
> TaskManagers when deploying on Yarn. That way one could define a fixed port 
> for the JM and a port range for the TMs. Having such a distinction you can 
> configure your Prometheus to scrape for the single JM and the TMs 
> individually. However, Flink does not yet support such a feature. You can 
> open a JIRA issue to track the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10276) Job Manager and Task Manager Metrics Reporter Ports Configuration

2018-10-25 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664420#comment-16664420
 ] 

Stavros Kontopoulos edited comment on FLINK-10276 at 10/25/18 11:28 PM:


This might be a problem on K8s too, which does not cover port ranges: 
[https://github.com/prometheus/prometheus/issues/3756]


was (Author: skonto):
This might be a problem on K8s two, which does not cover port ranges: 
https://github.com/prometheus/prometheus/issues/3756

> Job Manager and Task Manager Metrics Reporter Ports Configuration
> -
>
> Key: FLINK-10276
> URL: https://issues.apache.org/jira/browse/FLINK-10276
> Project: Flink
>  Issue Type: New Feature
>  Components: Core
>Reporter: Deirdre Kong
>Priority: Major
>
> *Problem Statement:*
> When deploying Flink using YARN, the job manager and task manager can be on 
> the same node or different nodes.  Say I specify the port range to be 
> 9249-9250, if JM and TM are deployed on the same node, the port for JM will 
> be 9249 and the port for TM will be 9250.  If JM and TM are deployed on 
> different nodes, then the ports for JM and TM will be 9249.
> I can only configure Prometheus once for the ports to scrape JM and TMs 
> metrics.  In this case, I won't know whether port 9249 is for JM or TM.  If 
> would be great if we can specify in flink-conf.yaml on the port we want for 
> JM reporter and TMs reporter.
> *Comment from Till:*
> I think we could extend Vino's proposal for Yarn as well: Maybe it makes 
> sense to allow to override certain configuration settings for the 
> TaskManagers when deploying on Yarn. That way one could define a fixed port 
> for the JM and a port range for the TMs. Having such a distinction you can 
> configure your Prometheus to scrape for the single JM and the TMs 
> individually. However, Flink does not yet support such a feature. You can 
> open a JIRA issue to track the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10682) EOFException occurs during deserialization of Avro class

2018-10-25 Thread Ben La Monica (JIRA)
Ben La Monica created FLINK-10682:
-

 Summary: EOFException occurs during deserialization of Avro class
 Key: FLINK-10682
 URL: https://issues.apache.org/jira/browse/FLINK-10682
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Affects Versions: 1.5.4
 Environment: AWS EMR 5.17 (upgraded to Flink 1.5.4)
3 task managers, 1 job manager running in YARN in Hadoop
Running on Amazon Linux with OpenJDK 1.8
Reporter: Ben La Monica


I'm having trouble (which usually occurs after an hour of processing in a 
StreamExecutionEnvironment) where I get this failure message. I'm at a loss for 
what is causing it. I'm running this in AWS on EMR 5.17 with 3 task managers 
and a job manager running in a YARN cluster and I've upgraded my flink 
libraries to 1.5.4 to bypass another serialization issue and the kerberos auth 
issues.

The avro classes that are being deserialized were generated with avro 1.8.2.
{code:java}
2018-10-22 16:12:10,680 [INFO ] class=o.a.flink.runtime.taskmanager.Task 
thread="Calculate Estimated NAV -> Split into single messages (3/10)" Calculate 
Estimated NAV -> Split into single messages (3/10) (de7d8fa77
84903a475391d0168d56f2e) switched from RUNNING to FAILED.
java.io.EOFException: null
at 
org.apache.flink.core.memory.DataInputDeserializer.readLong(DataInputDeserializer.java:219)
at 
org.apache.flink.core.memory.DataInputDeserializer.readDouble(DataInputDeserializer.java:138)
at 
org.apache.flink.formats.avro.utils.DataInputDecoder.readDouble(DataInputDecoder.java:70)
at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at 
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:266)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at 
org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:172)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:208)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:208)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:116)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748){code}
Do you have any ideas on how I could further troubleshoot this issue?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-25 Thread Devin Thomson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664305#comment-16664305
 ] 

Devin Thomson commented on FLINK-4582:
--

[~yxu-lyft] [~tzulitai]

Hey guys! Devin from Tinder Engineering here 
(https://www.linkedin.com/in/devin-thomson-44a35651/).

I've been following this ticket for a couple of weeks now. We have a strong 
need for this at Tinder so I went ahead and built it! Here's a PR:

[https://github.com/tinder-dthomson/flink/pull/3]

I am not a contributor so I had to fork flink. If you want to take a look and 
let me know what you think, I'd be glad to contribute this back to the 
community!

Also [~yxu-lyft] I don't mean to step on your toes here - if you have a better 
solution I am of course happy to use that instead!

 

- Devin

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-25 Thread Devin Thomson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664305#comment-16664305
 ] 

Devin Thomson edited comment on FLINK-4582 at 10/25/18 9:23 PM:


[~yxu-lyft] [~tzulitai]

Hey guys! Devin from Tinder Engineering here 
([https://www.linkedin.com/in/devin-thomson-44a35651/]).

I've been following this ticket for a couple of weeks now. We have a strong 
need for this at Tinder so I went ahead and built it! Here's a PR:

[https://github.com/tinder-dthomson/flink/pull/3]

I am not a contributor so I had to fork flink. If you want to take a look and 
let me know what you think, I'd be glad to contribute this back to the 
community!

Also [~yxu-lyft] I don't mean to step on your toes here - if you have a better 
solution I am of course happy to use that instead!

 

Devin


was (Author: tinder-dthomson):
[~yxu-lyft] [~tzulitai]

Hey guys! Devin from Tinder Engineering here 
(https://www.linkedin.com/in/devin-thomson-44a35651/).

I've been following this ticket for a couple of weeks now. We have a strong 
need for this at Tinder so I went ahead and built it! Here's a PR:

[https://github.com/tinder-dthomson/flink/pull/3]

I am not a contributor so I had to fork flink. If you want to take a look and 
let me know what you think, I'd be glad to contribute this back to the 
community!

Also [~yxu-lyft] I don't mean to step on your toes here - if you have a better 
solution I am of course happy to use that instead!

 

- Devin

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-6444:
--
Labels: pull-request-available  (was: )

> Add a check that '@VisibleForTesting' methods are only used in tests
> 
>
> Key: FLINK-6444
> URL: https://issues.apache.org/jira/browse/FLINK-6444
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
>
> Some methods are annotated with {{@VisibleForTesting}}. These methods should 
> only be called from tests.
> This is currently not enforced / checked during the build. We should add such 
> a check.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6444) Add a check that '@VisibleForTesting' methods are only used in tests

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664278#comment-16664278
 ] 

ASF GitHub Bot commented on FLINK-6444:
---

zentol closed pull request #4705: [FLINK-6444] [build] Add a check that 
'@VisibleForTesting' methods ar…
URL: https://github.com/apache/flink/pull/4705
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 46c821edfa2..eb7dc3106f6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -439,7 +439,7 @@ public CheckpointingMode getCheckpointingMode() {
 * from operations on {@link 
org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained
 * (heap, managed memory, externally), and where state 
snapshots/checkpoints are stored, both for
 * the key/value state, and for checkpointed functions (implementing 
the interface
-* {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
+* {@link org.apache.flink.streaming.api.checkpoint.ListCheckpointed}).
 *
 * The {@link 
org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
 * maintains the state in heap memory, as objects. It is lightweight 
without extra dependencies,
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java
new file mode 100644
index 000..cb5f2cbecba
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.junit.Test;
+import org.reflections.Reflections;
+import org.reflections.scanners.MemberUsageScanner;
+import org.reflections.scanners.MethodAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.lang.reflect.Member;
+import java.lang.reflect.Method;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test check the methods are annotated with @VisibleForTesting. But 
still was called from the class
+ * which does not belong to the tests. These methods should only be called 
from tests.
+ */
+public class CheckVisibleForTestingUsage {
+
+   @Test
+   public void testCheckVisibleForTesting() throws Exception {
+   ConfigurationBuilder configurationBuilder = new 
ConfigurationBuilder()
+   
.useParallelExecutor(Runtime.getRuntime().availableProcessors())
+   
.addUrls(ClasspathHelper.forPackage("org.apache.flink.core"))
+   .addScanners(
+   new MemberUsageScanner(),
+   new MethodAnnotationsScanner());
+
+   final Reflections reflections = new 
Reflections(configurationBuilder);
+
+   Set methods = 
reflections.getMethodsAnnotatedWith(VisibleForTesting.class);
+
+   for (Method method : methods) {
+   Set usages = reflections.getMethodUsage(method);
+   for (Member member : usages) {
+   if (member instanceof Method) {
+   Method visibleForTestingUsageScope = 
(Method) member;
+   if 
(!visibleForTestingUsage

[GitHub] zentol closed pull request #4705: [FLINK-6444] [build] Add a check that '@VisibleForTesting' methods ar…

2018-10-25 Thread GitBox
zentol closed pull request #4705: [FLINK-6444] [build] Add a check that 
'@VisibleForTesting' methods ar…
URL: https://github.com/apache/flink/pull/4705
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 46c821edfa2..eb7dc3106f6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -439,7 +439,7 @@ public CheckpointingMode getCheckpointingMode() {
 * from operations on {@link 
org.apache.flink.streaming.api.datastream.KeyedStream}) is maintained
 * (heap, managed memory, externally), and where state 
snapshots/checkpoints are stored, both for
 * the key/value state, and for checkpointed functions (implementing 
the interface
-* {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}).
+* {@link org.apache.flink.streaming.api.checkpoint.ListCheckpointed}).
 *
 * The {@link 
org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
 * maintains the state in heap memory, as objects. It is lightweight 
without extra dependencies,
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java
 
b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java
new file mode 100644
index 000..cb5f2cbecba
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/manual/CheckVisibleForTestingUsage.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.manual;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.junit.Test;
+import org.reflections.Reflections;
+import org.reflections.scanners.MemberUsageScanner;
+import org.reflections.scanners.MethodAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.lang.reflect.Member;
+import java.lang.reflect.Method;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test check the methods are annotated with @VisibleForTesting. But 
still was called from the class
+ * which does not belong to the tests. These methods should only be called 
from tests.
+ */
+public class CheckVisibleForTestingUsage {
+
+   @Test
+   public void testCheckVisibleForTesting() throws Exception {
+   ConfigurationBuilder configurationBuilder = new 
ConfigurationBuilder()
+   
.useParallelExecutor(Runtime.getRuntime().availableProcessors())
+   
.addUrls(ClasspathHelper.forPackage("org.apache.flink.core"))
+   .addScanners(
+   new MemberUsageScanner(),
+   new MethodAnnotationsScanner());
+
+   final Reflections reflections = new 
Reflections(configurationBuilder);
+
+   Set methods = 
reflections.getMethodsAnnotatedWith(VisibleForTesting.class);
+
+   for (Method method : methods) {
+   Set usages = reflections.getMethodUsage(method);
+   for (Member member : usages) {
+   if (member instanceof Method) {
+   Method visibleForTestingUsageScope = 
(Method) member;
+   if 
(!visibleForTestingUsageScope.isAnnotationPresent(Test.class)) {
+   assertEquals("Unexpected calls: 
" + visibleForTestingUsageScope.getDeclaringClass() + "#" + 
visibleForTestingUsageScope.getName(),
+  

[jira] [Updated] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-6857:
--
Labels: pull-request-available  (was: )

> Add global default Kryo serializer configuration to StreamExecutionEnvironment
> --
>
> Key: FLINK-6857
> URL: https://issues.apache.org/jira/browse/FLINK-6857
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
>
> See ML for original discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html.
> We should have an additional {{setDefaultKryoSerializer}} method that allows 
> overriding the global default serializer that is not tied to specific classes 
> (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default 
> serializer settings can be found for a class). Internally in Flink's 
> {{KryoSerializer}}, this would only be a matter of proxying that configured 
> global default serializer for Kryo by calling 
> {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664270#comment-16664270
 ] 

ASF GitHub Bot commented on FLINK-6857:
---

zentol closed pull request #4166: [FLINK-6857] [types] Add global default Kryo 
serializer configuration…
URL: https://github.com/apache/flink/pull/4166
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 26e6af1e945..1b02176fe5d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.api.common;
 
+import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -146,6 +148,8 @@
 */
private long taskCancellationTimeoutMillis = -1;
 
+   private Kryo kryo = new Kryo();
+
// --- User code values 

 
private GlobalJobParameters globalJobParameters;
@@ -678,7 +682,8 @@ public void setGlobalJobParameters(GlobalJobParameters 
globalJobParameters) {
 * Adds a new Kryo default serializer to the Runtime.
 *
 * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
-* because it may be distributed to the worker nodes by java 
serialization.
+* because it may be distributed to the worker nodes by java 
serialization. Also, this method
+* can only tied to specific class which correspond to the 
addDefaultSerializer method in Kryo.
 *
 * @param type The class of the types serialized with the given 
serializer.
 * @param serializer The serializer to use.
@@ -694,6 +699,9 @@ public void setGlobalJobParameters(GlobalJobParameters 
globalJobParameters) {
/**
 * Adds a new Kryo default serializer to the Runtime.
 *
+* Note that this method can only tied to specific class which 
correspond
+* to the addDefaultSerializer method in Kryo.
+*
 * @param type The class of the types serialized with the given 
serializer.
 * @param serializerClass The class of the serializer to use.
 */
@@ -704,6 +712,25 @@ public void addDefaultKryoSerializer(Class type, 
Class type, Class> serializerClass) {
+   if (type == null || serializerClass == null) {
+   throw new NullPointerException("Cannot register null 
class or serializer.");
+   }
+   kryo.newInstance(type);
+   kryo.setDefaultSerializer(serializerClass);
+   defaultKryoSerializerClasses.put(type, serializerClass);
+   }
+
/**
 * Registers the given type with a Kryo Serializer.
 *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 46c821edfa2..b4f328a8907 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -559,6 +559,22 @@ public void addDefaultKryoSerializer(Class type, 
ClassNote that this method is different from {@link 
#addDefaultKryoSerializer(Class, Class)},
+* you can specify your own serializer class to use when no {@link 
#addDefaultKryoSerializer(Class, Class)
+* default serializers} match an object's type.
+*
+* @param type
+*  The class of the types serialized with the given 
serializer.
+* @param serializerClass
+*  The class of the serializer to use.
+*/
+   public void setDefaultKryoSerializer(Class type, Class> serializerClass) {
+   config.setDefaultKryoSerializer(type, serializerClass);
+   }
+
/**
 * Registers the given type with a Kryo Serializer.
 *


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this servi

[GitHub] zentol closed pull request #4166: [FLINK-6857] [types] Add global default Kryo serializer configuration…

2018-10-25 Thread GitBox
zentol closed pull request #4166: [FLINK-6857] [types] Add global default Kryo 
serializer configuration…
URL: https://github.com/apache/flink/pull/4166
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 26e6af1e945..1b02176fe5d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.api.common;
 
+import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -146,6 +148,8 @@
 */
private long taskCancellationTimeoutMillis = -1;
 
+   private Kryo kryo = new Kryo();
+
// --- User code values 

 
private GlobalJobParameters globalJobParameters;
@@ -678,7 +682,8 @@ public void setGlobalJobParameters(GlobalJobParameters 
globalJobParameters) {
 * Adds a new Kryo default serializer to the Runtime.
 *
 * Note that the serializer instance must be serializable (as defined 
by java.io.Serializable),
-* because it may be distributed to the worker nodes by java 
serialization.
+* because it may be distributed to the worker nodes by java 
serialization. Also, this method
+* can only tied to specific class which correspond to the 
addDefaultSerializer method in Kryo.
 *
 * @param type The class of the types serialized with the given 
serializer.
 * @param serializer The serializer to use.
@@ -694,6 +699,9 @@ public void setGlobalJobParameters(GlobalJobParameters 
globalJobParameters) {
/**
 * Adds a new Kryo default serializer to the Runtime.
 *
+* Note that this method can only tied to specific class which 
correspond
+* to the addDefaultSerializer method in Kryo.
+*
 * @param type The class of the types serialized with the given 
serializer.
 * @param serializerClass The class of the serializer to use.
 */
@@ -704,6 +712,25 @@ public void addDefaultKryoSerializer(Class type, 
Class type, Class> serializerClass) {
+   if (type == null || serializerClass == null) {
+   throw new NullPointerException("Cannot register null 
class or serializer.");
+   }
+   kryo.newInstance(type);
+   kryo.setDefaultSerializer(serializerClass);
+   defaultKryoSerializerClasses.put(type, serializerClass);
+   }
+
/**
 * Registers the given type with a Kryo Serializer.
 *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 46c821edfa2..b4f328a8907 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -559,6 +559,22 @@ public void addDefaultKryoSerializer(Class type, 
ClassNote that this method is different from {@link 
#addDefaultKryoSerializer(Class, Class)},
+* you can specify your own serializer class to use when no {@link 
#addDefaultKryoSerializer(Class, Class)
+* default serializers} match an object's type.
+*
+* @param type
+*  The class of the types serialized with the given 
serializer.
+* @param serializerClass
+*  The class of the serializer to use.
+*/
+   public void setDefaultKryoSerializer(Class type, Class> serializerClass) {
+   config.setDefaultKryoSerializer(type, serializerClass);
+   }
+
/**
 * Registers the given type with a Kryo Serializer.
 *


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10681) elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664189#comment-16664189
 ] 

ASF GitHub Bot commented on FLINK-10681:


zentol commented on a change in pull request #6928: [FLINK-10681] Harden 
ElasticsearchSinkITCase against wrong JNA library
URL: https://github.com/apache/flink/pull/6928#discussion_r228302547
 
 

 ##
 File path: flink-connectors/flink-connector-elasticsearch6/pom.xml
 ##
 @@ -278,6 +278,9 @@ under the License.
maven-surefire-plugin
2.12.2

+   
+   true
 
 Review comment:
   is this only relevant for tests or also in production?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed
> ---
>
> Key: FLINK-10681
> URL: https://issues.apache.org/jira/browse/FLINK-10681
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Tests
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> The {{elasticsearch6.ElasticsearchSinkITCase}} fails on systems where a wrong 
> JNA library is installed.
> {code}
> There is an incompatible JNA native library installed on this system
> Expected: 5.2.0
> Found:4.0.0
> /usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib.
> To resolve this issue you may do one of the following:
>  - remove or uninstall the offending library
>  - set the system property jna.nosys=true
>  - set jna.boot.library.path to include the path to the version of the
>jnidispatch library included with the JNA jar file you are using
> at com.sun.jna.Native.(Native.java:199)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at org.elasticsearch.bootstrap.Natives.(Natives.java:45)
> at 
> org.elasticsearch.bootstrap.BootstrapInfo.isMemoryLocked(BootstrapInfo.java:50)
> at 
> org.elasticsearch.monitor.process.ProcessProbe.processInfo(ProcessProbe.java:130)
> at 
> org.elasticsearch.monitor.process.ProcessService.(ProcessService.java:44)
> at 
> org.elasticsearch.monitor.MonitorService.(MonitorService.java:48)
> at org.elasticsearch.node.Node.(Node.java:363)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl$PluginNode.(EmbeddedElasticsearchNodeEnvironmentImpl.java:85)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl.start(EmbeddedElasticsearchNodeEnvironmentImpl.java:53)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.prepare(ElasticsearchSinkTestBase.java:73)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
> 

[GitHub] zentol commented on a change in pull request #6928: [FLINK-10681] Harden ElasticsearchSinkITCase against wrong JNA library

2018-10-25 Thread GitBox
zentol commented on a change in pull request #6928: [FLINK-10681] Harden 
ElasticsearchSinkITCase against wrong JNA library
URL: https://github.com/apache/flink/pull/6928#discussion_r228302547
 
 

 ##
 File path: flink-connectors/flink-connector-elasticsearch6/pom.xml
 ##
 @@ -278,6 +278,9 @@ under the License.
maven-surefire-plugin
2.12.2

+   
+   true
 
 Review comment:
   is this only relevant for tests or also in production?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664026#comment-16664026
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

tzulitai removed a comment on issue #6931: [FLINK-9808] [state-backends] 
Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6931#issuecomment-433123169
 
 
   cc @StefanRRichter & @kl0u
   
   This PR subsumes #6875. Stefan, I've also addressed your latest comments in 
the previous PR.
   What is left to address is 1) a better migration abstraction layering, and 
2) a set of migration tests that is less implementation-sensitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai removed a comment on issue #6931: [FLINK-9808] [state-backends] Migrate state when necessary in state backends

2018-10-25 Thread GitBox
tzulitai removed a comment on issue #6931: [FLINK-9808] [state-backends] 
Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6931#issuecomment-433123169
 
 
   cc @StefanRRichter & @kl0u
   
   This PR subsumes #6875. Stefan, I've also addressed your latest comments in 
the previous PR.
   What is left to address is 1) a better migration abstraction layering, and 
2) a set of migration tests that is less implementation-sensitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663989#comment-16663989
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

tzulitai closed pull request #6875: [FLINK-9808] [state backends] Migrate state 
when necessary in state backends
URL: https://github.com/apache/flink/pull/6875
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 7a1675eafba..b8282fe238c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -22,6 +22,7 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StateMigrationException;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -238,7 +239,8 @@
} else if (compat.isCompatibleAfterMigration()) {
return CompatibilityResult.requiresMigration();
} else if (compat.isIncompatible()) {
-   throw new IllegalStateException("The new 
serializer is incompatible.");
+   throw new RuntimeException(
+   new StateMigrationException("The new 
serializer is incompatible, meaning that the new serializer can't be used even 
if state migration is performed."));
} else {
throw new IllegalStateException("Unidentifiable 
schema compatibility type. This is a bug, please file a JIRA.");
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eae5a3bccdd..ae4fbaa133b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,9 +24,9 @@
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -228,42 +228,32 @@ public void dispose() {
 
final StateMetaInfoSnapshot metaInfoSnapshot = 
restoredBroadcastStateMetaInfos.get(name);
 
-   @SuppressWarnings("unchecked")
-   RegisteredBroadcastStateBackendMetaInfo 
restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo(metaInfoSnapshot);
+   // check whether new serializers are incompatible
+   TypeSerializerSnapshot keySerializerSnapshot = 
Preconditions.checkNotNull(
+   (TypeSerializerSnapshot) 
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
 
-   // check compatibility to determine if state migration 
is required
-   CompatibilityResult keyCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-   restoredMetaInfo.getKeySerializer(),
-   UnloadableDummyTypeSerializer.class,
-   //TODO this keys should not be exposed 
and should be adapted after FLINK-9377 was merged
-   
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-   broadcastStateKeySerializer);
-
-   CompatibilityResult valueCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-   restoredMetaInfo.getValueSerializer(),
-   

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663988#comment-16663988
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

tzulitai commented on issue #6875: [FLINK-9808] [state backends] Migrate state 
when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#issuecomment-433123440
 
 
   This PR is now subsumed by #6931.
   All comments have been addressed in the new PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663986#comment-16663986
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

tzulitai commented on issue #6931: [FLINK-9808] [state-backends] Migrate state 
when necessary in state backends
URL: https://github.com/apache/flink/pull/6931#issuecomment-433123169
 
 
   cc @StefanRRichter & @kl0u
   
   This PR subsumes #6875. Stefan, I've also addressed your latest comments in 
the previous PR.
   What is left to address is 1) a better migration abstraction layering, and 
2) a set of migration tests that is less implementation-sensitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai closed pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-25 Thread GitBox
tzulitai closed pull request #6875: [FLINK-9808] [state backends] Migrate state 
when necessary in state backends
URL: https://github.com/apache/flink/pull/6875
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 7a1675eafba..b8282fe238c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -22,6 +22,7 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.StateMigrationException;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -238,7 +239,8 @@
} else if (compat.isCompatibleAfterMigration()) {
return CompatibilityResult.requiresMigration();
} else if (compat.isIncompatible()) {
-   throw new IllegalStateException("The new 
serializer is incompatible.");
+   throw new RuntimeException(
+   new StateMigrationException("The new 
serializer is incompatible, meaning that the new serializer can't be used even 
if state migration is performed."));
} else {
throw new IllegalStateException("Unidentifiable 
schema compatibility type. This is a bug, please file a JIRA.");
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index eae5a3bccdd..ae4fbaa133b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,9 +24,9 @@
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -228,42 +228,32 @@ public void dispose() {
 
final StateMetaInfoSnapshot metaInfoSnapshot = 
restoredBroadcastStateMetaInfos.get(name);
 
-   @SuppressWarnings("unchecked")
-   RegisteredBroadcastStateBackendMetaInfo 
restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo(metaInfoSnapshot);
+   // check whether new serializers are incompatible
+   TypeSerializerSnapshot keySerializerSnapshot = 
Preconditions.checkNotNull(
+   (TypeSerializerSnapshot) 
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));
 
-   // check compatibility to determine if state migration 
is required
-   CompatibilityResult keyCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-   restoredMetaInfo.getKeySerializer(),
-   UnloadableDummyTypeSerializer.class,
-   //TODO this keys should not be exposed 
and should be adapted after FLINK-9377 was merged
-   
metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
-   broadcastStateKeySerializer);
-
-   CompatibilityResult valueCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-   restoredMetaInfo.getValueSerializer(),
-   UnloadableDummyTypeSerializer.class,
-   //TODO this keys should not be exposed 
and should be adapted after FLINK-9377 was merged
-   
metaInfoSnapshot.getTypeSerializerConfigSnapshot(

[GitHub] tzulitai commented on issue #6931: [FLINK-9808] [state-backends] Migrate state when necessary in state backends

2018-10-25 Thread GitBox
tzulitai commented on issue #6931: [FLINK-9808] [state-backends] Migrate state 
when necessary in state backends
URL: https://github.com/apache/flink/pull/6931#issuecomment-433123169
 
 
   cc @StefanRRichter & @kl0u
   
   This PR subsumes #6875. Stefan, I've also addressed your latest comments in 
the previous PR.
   What is left to address is 1) a better migration abstraction layering, and 
2) a set of migration tests that is less implementation-sensitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on issue #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-25 Thread GitBox
tzulitai commented on issue #6875: [FLINK-9808] [state backends] Migrate state 
when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#issuecomment-433123440
 
 
   This PR is now subsumed by #6931.
   All comments have been addressed in the new PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663983#comment-16663983
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

tzulitai opened a new pull request #6931: [FLINK-9808] [state-backends] Migrate 
state when necessary
URL: https://github.com/apache/flink/pull/6931
 
 
   ## What is the purpose of the change
   
   This PR is based on #6930. Only the last commit is relevant.
   
   This PR adds the procedure of migrating state (i.e., reading state bytes 
with the restored prior serializer, and then re-writing the deserialized object 
with the new serializer).
   
   - This should only ever occur in the RocksDB state backend, since the 
restore / snapshotting of heap-based backends are already a migration process 
by nature. This is identified by a result of 
`TypeSerializerSchemaCompatibility.compatibleAfterMigration()` on the state 
serializer's compatibility check.
   - For heap-based backends, we only need to check that the new state 
serializer isn't incompatible. This is identified by a result of 
`TypeSerializerSchemaCompatibility.incompatible()` on the state serializer's 
compatibility check.
   
   ## Brief change log
   
   - Add `migrateSerializedValue` to `AbstractRocksDBStateValue`
   - Use the new method to iterate through RocksDB bytes when state migration 
is necessary
   - Adapt heap-based backends to check fail when new serializers are 
incompatible.
   - Add a `StateMigrationTestBase` that verifies serializers are being used as 
expected for a restore procedure
   
   ## Verifying this change
   
   The main new test coverage that covers this is in the 
`StateMigrationTestBase` class.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (**yes** / no / don't know)
 - The runtime per-record code paths (performance sensitive): (**yes** / no 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai opened a new pull request #6931: [FLINK-9808] [state-backends] Migrate state when necessary

2018-10-25 Thread GitBox
tzulitai opened a new pull request #6931: [FLINK-9808] [state-backends] Migrate 
state when necessary
URL: https://github.com/apache/flink/pull/6931
 
 
   ## What is the purpose of the change
   
   This PR is based on #6930. Only the last commit is relevant.
   
   This PR adds the procedure of migrating state (i.e., reading state bytes 
with the restored prior serializer, and then re-writing the deserialized object 
with the new serializer).
   
   - This should only ever occur in the RocksDB state backend, since the 
restore / snapshotting of heap-based backends are already a migration process 
by nature. This is identified by a result of 
`TypeSerializerSchemaCompatibility.compatibleAfterMigration()` on the state 
serializer's compatibility check.
   - For heap-based backends, we only need to check that the new state 
serializer isn't incompatible. This is identified by a result of 
`TypeSerializerSchemaCompatibility.incompatible()` on the state serializer's 
compatibility check.
   
   ## Brief change log
   
   - Add `migrateSerializedValue` to `AbstractRocksDBStateValue`
   - Use the new method to iterate through RocksDB bytes when state migration 
is necessary
   - Adapt heap-based backends to check fail when new serializers are 
incompatible.
   - Add a `StateMigrationTestBase` that verifies serializers are being used as 
expected for a restore procedure
   
   ## Verifying this change
   
   The main new test coverage that covers this is in the 
`StateMigrationTestBase` class.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (**yes** / no / don't know)
 - The runtime per-record code paths (performance sensitive): (**yes** / no 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai opened a new pull request #6930: [FLINK-10679] Remove deprecated CompatibilityResult and related classes from framework code

2018-10-25 Thread GitBox
tzulitai opened a new pull request #6930: [FLINK-10679] Remove deprecated 
CompatibilityResult and related classes from framework code
URL: https://github.com/apache/flink/pull/6930
 
 
   ## What is the purpose of the change
   
   The main purpose of this PR is the following:
   
   1) The now deprecated `CompatibilityResult` and related classes (such as 
`CompatibilityUtil` and `UnloadableDummyTypeSerializer`) should no longer leak 
into Flink's framework code.
   2) This requires letting the new 
`TypeSerializerSchemaCompatibility#resolveSchemaCompatibility` method be the 
only entry point for compatibility checks.
   3) `TypeSerializerConfigSnapshot#resolveSchemaCompatibility` should then 
serve as the adapter to delegate compatibility checks back to the serializer's 
`ensureComaptibility` method.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (**yes** / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10679) Let TypeSerializerSchemaCompatibility.resolveSchemaCompatibility() be the entry point for compatibility checks in framework code

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10679:
---
Labels: pull-request-available  (was: )

> Let TypeSerializerSchemaCompatibility.resolveSchemaCompatibility() be the 
> entry point for compatibility checks in framework code
> 
>
> Key: FLINK-10679
> URL: https://issues.apache.org/jira/browse/FLINK-10679
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the state backend framework code still is exposed the now 
> deprecated {{CompatibilityResult}} and relevant classes.
> Instead, all compatibility checks should go through the new 
> {{TypeSerializerSchemaCompatibility#resolveSchemaCompatibility}} method, and 
> allow framework code to check against the more powerful 
> {{TypeSerializerSchemaCompatibility}} for incompatibility / migration 
> requirements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10679) Let TypeSerializerSchemaCompatibility.resolveSchemaCompatibility() be the entry point for compatibility checks in framework code

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663977#comment-16663977
 ] 

ASF GitHub Bot commented on FLINK-10679:


tzulitai opened a new pull request #6930: [FLINK-10679] Remove deprecated 
CompatibilityResult and related classes from framework code
URL: https://github.com/apache/flink/pull/6930
 
 
   ## What is the purpose of the change
   
   The main purpose of this PR is the following:
   
   1) The now deprecated `CompatibilityResult` and related classes (such as 
`CompatibilityUtil` and `UnloadableDummyTypeSerializer`) should no longer leak 
into Flink's framework code.
   2) This requires letting the new 
`TypeSerializerSchemaCompatibility#resolveSchemaCompatibility` method be the 
only entry point for compatibility checks.
   3) `TypeSerializerConfigSnapshot#resolveSchemaCompatibility` should then 
serve as the adapter to delegate compatibility checks back to the serializer's 
`ensureComaptibility` method.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (**yes** / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Let TypeSerializerSchemaCompatibility.resolveSchemaCompatibility() be the 
> entry point for compatibility checks in framework code
> 
>
> Key: FLINK-10679
> URL: https://issues.apache.org/jira/browse/FLINK-10679
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the state backend framework code still is exposed the now 
> deprecated {{CompatibilityResult}} and relevant classes.
> Instead, all compatibility checks should go through the new 
> {{TypeSerializerSchemaCompatibility#resolveSchemaCompatibility}} method, and 
> allow framework code to check against the more powerful 
> {{TypeSerializerSchemaCompatibility}} for incompatibility / migration 
> requirements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10599) Provide documentation for the modern kafka connector

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663950#comment-16663950
 ] 

ASF GitHub Bot commented on FLINK-10599:


yanghua commented on issue #6889: [FLINK-10599][Documentation] Provide 
documentation for the modern kafka connector
URL: https://github.com/apache/flink/pull/6889#issuecomment-433110004
 
 
   @pnowojski updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide documentation for the modern kafka connector
> 
>
> Key: FLINK-10599
> URL: https://issues.apache.org/jira/browse/FLINK-10599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6889: [FLINK-10599][Documentation] Provide documentation for the modern kafka connector

2018-10-25 Thread GitBox
yanghua commented on issue #6889: [FLINK-10599][Documentation] Provide 
documentation for the modern kafka connector
URL: https://github.com/apache/flink/pull/6889#issuecomment-433110004
 
 
   @pnowojski updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann opened a new pull request #6929: [BP-1.6][FLINK-10681] Harden ElasticsearchSinkITCase against wrong JNA library

2018-10-25 Thread GitBox
tillrohrmann opened a new pull request #6929: [BP-1.6][FLINK-10681] Harden 
ElasticsearchSinkITCase against wrong JNA library
URL: https://github.com/apache/flink/pull/6929
 
 
   Backport of #6928 to `release-1.6`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10681) elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663933#comment-16663933
 ] 

ASF GitHub Bot commented on FLINK-10681:


tillrohrmann opened a new pull request #6928: [FLINK-10681] Harden 
ElasticsearchSinkITCase against wrong JNA library
URL: https://github.com/apache/flink/pull/6928
 
 
   ## What is the purpose of the change
   
   Set the system property jna.nosys=true to avoid ElasticsearchSinkITCase test 
failures
   due a wrong JNA library version.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed
> ---
>
> Key: FLINK-10681
> URL: https://issues.apache.org/jira/browse/FLINK-10681
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Tests
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> The {{elasticsearch6.ElasticsearchSinkITCase}} fails on systems where a wrong 
> JNA library is installed.
> {code}
> There is an incompatible JNA native library installed on this system
> Expected: 5.2.0
> Found:4.0.0
> /usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib.
> To resolve this issue you may do one of the following:
>  - remove or uninstall the offending library
>  - set the system property jna.nosys=true
>  - set jna.boot.library.path to include the path to the version of the
>jnidispatch library included with the JNA jar file you are using
> at com.sun.jna.Native.(Native.java:199)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at org.elasticsearch.bootstrap.Natives.(Natives.java:45)
> at 
> org.elasticsearch.bootstrap.BootstrapInfo.isMemoryLocked(BootstrapInfo.java:50)
> at 
> org.elasticsearch.monitor.process.ProcessProbe.processInfo(ProcessProbe.java:130)
> at 
> org.elasticsearch.monitor.process.ProcessService.(ProcessService.java:44)
> at 
> org.elasticsearch.monitor.MonitorService.(MonitorService.java:48)
> at org.elasticsearch.node.Node.(Node.java:363)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl$PluginNode.(EmbeddedElasticsearchNodeEnvironmentImpl.java:85)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl.start(EmbeddedElasticsearchNodeEnvironmentImpl.java:53)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.prepare(ElasticsearchSinkTestBase.java:73)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.jun

[jira] [Updated] (FLINK-10681) elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed

2018-10-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10681:
---
Labels: pull-request-available  (was: )

> elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed
> ---
>
> Key: FLINK-10681
> URL: https://issues.apache.org/jira/browse/FLINK-10681
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Tests
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> The {{elasticsearch6.ElasticsearchSinkITCase}} fails on systems where a wrong 
> JNA library is installed.
> {code}
> There is an incompatible JNA native library installed on this system
> Expected: 5.2.0
> Found:4.0.0
> /usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib.
> To resolve this issue you may do one of the following:
>  - remove or uninstall the offending library
>  - set the system property jna.nosys=true
>  - set jna.boot.library.path to include the path to the version of the
>jnidispatch library included with the JNA jar file you are using
> at com.sun.jna.Native.(Native.java:199)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at org.elasticsearch.bootstrap.Natives.(Natives.java:45)
> at 
> org.elasticsearch.bootstrap.BootstrapInfo.isMemoryLocked(BootstrapInfo.java:50)
> at 
> org.elasticsearch.monitor.process.ProcessProbe.processInfo(ProcessProbe.java:130)
> at 
> org.elasticsearch.monitor.process.ProcessService.(ProcessService.java:44)
> at 
> org.elasticsearch.monitor.MonitorService.(MonitorService.java:48)
> at org.elasticsearch.node.Node.(Node.java:363)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl$PluginNode.(EmbeddedElasticsearchNodeEnvironmentImpl.java:85)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl.start(EmbeddedElasticsearchNodeEnvironmentImpl.java:53)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.prepare(ElasticsearchSinkTestBase.java:73)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
> at 
> org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
> at 
> org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:113)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
> {code}
> I pro

[GitHub] tillrohrmann opened a new pull request #6928: [FLINK-10681] Harden ElasticsearchSinkITCase against wrong JNA library

2018-10-25 Thread GitBox
tillrohrmann opened a new pull request #6928: [FLINK-10681] Harden 
ElasticsearchSinkITCase against wrong JNA library
URL: https://github.com/apache/flink/pull/6928
 
 
   ## What is the purpose of the change
   
   Set the system property jna.nosys=true to avoid ElasticsearchSinkITCase test 
failures
   due a wrong JNA library version.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10681) elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed

2018-10-25 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10681:
--
Fix Version/s: (was: 1.6.2)
   1.6.3

> elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed
> ---
>
> Key: FLINK-10681
> URL: https://issues.apache.org/jira/browse/FLINK-10681
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Tests
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.6.3, 1.7.0
>
>
> The {{elasticsearch6.ElasticsearchSinkITCase}} fails on systems where a wrong 
> JNA library is installed.
> {code}
> There is an incompatible JNA native library installed on this system
> Expected: 5.2.0
> Found:4.0.0
> /usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib.
> To resolve this issue you may do one of the following:
>  - remove or uninstall the offending library
>  - set the system property jna.nosys=true
>  - set jna.boot.library.path to include the path to the version of the
>jnidispatch library included with the JNA jar file you are using
> at com.sun.jna.Native.(Native.java:199)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at org.elasticsearch.bootstrap.Natives.(Natives.java:45)
> at 
> org.elasticsearch.bootstrap.BootstrapInfo.isMemoryLocked(BootstrapInfo.java:50)
> at 
> org.elasticsearch.monitor.process.ProcessProbe.processInfo(ProcessProbe.java:130)
> at 
> org.elasticsearch.monitor.process.ProcessService.(ProcessService.java:44)
> at 
> org.elasticsearch.monitor.MonitorService.(MonitorService.java:48)
> at org.elasticsearch.node.Node.(Node.java:363)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl$PluginNode.(EmbeddedElasticsearchNodeEnvironmentImpl.java:85)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl.start(EmbeddedElasticsearchNodeEnvironmentImpl.java:53)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.prepare(ElasticsearchSinkTestBase.java:73)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
> at 
> org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
> at 
> org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:113)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
> {code}
> I propose to solve the problem by se

[jira] [Commented] (FLINK-10357) Streaming File Sink end-to-end test failed with mismatch

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663928#comment-16663928
 ] 

ASF GitHub Bot commented on FLINK-10357:


azagrebin commented on a change in pull request #6907: [FLINK-10357][tests] 
Improve StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228228728
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
+local previous_number_of_values=-1
+
+while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
 
 Review comment:
   I meant unix epoch time in seconds:
   `date +%s`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streaming File Sink end-to-end test failed with mismatch
> 
>
> Key: FLINK-10357
> URL: https://issues.apache.org/jira/browse/FLINK-10357
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.6.3, 1.7.0
>
> Attachments: flink-logs.tgz, flink-streaming-file-sink-logs.tgz
>
>
> The {{Streaming File Sink end-to-end test}} failed on an Amazon instance with 
> the following result: 
> {code}
> FAIL File Streaming Sink: Output hash mismatch.  Got 
> f2000bbc18a889dc8ec4b6f2b47bf9f5, expected 6727342fdd3aae2129e61fc8f433fb6f.
> head hexdump of actual:
> 000   0  \n   1  \n   2  \n   3  \n   4  \n   5  \n   6  \n   7  \n
> 010   8  \n   9  \n
> 014
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10681) elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed

2018-10-25 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10681:
--
Summary: elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library 
installed  (was: elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA 
librar installed)

> elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA library installed
> ---
>
> Key: FLINK-10681
> URL: https://issues.apache.org/jira/browse/FLINK-10681
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Tests
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.6.2, 1.7.0
>
>
> The {{elasticsearch6.ElasticsearchSinkITCase}} fails on systems where a wrong 
> JNA library is installed.
> {code}
> There is an incompatible JNA native library installed on this system
> Expected: 5.2.0
> Found:4.0.0
> /usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib.
> To resolve this issue you may do one of the following:
>  - remove or uninstall the offending library
>  - set the system property jna.nosys=true
>  - set jna.boot.library.path to include the path to the version of the
>jnidispatch library included with the JNA jar file you are using
> at com.sun.jna.Native.(Native.java:199)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at org.elasticsearch.bootstrap.Natives.(Natives.java:45)
> at 
> org.elasticsearch.bootstrap.BootstrapInfo.isMemoryLocked(BootstrapInfo.java:50)
> at 
> org.elasticsearch.monitor.process.ProcessProbe.processInfo(ProcessProbe.java:130)
> at 
> org.elasticsearch.monitor.process.ProcessService.(ProcessService.java:44)
> at 
> org.elasticsearch.monitor.MonitorService.(MonitorService.java:48)
> at org.elasticsearch.node.Node.(Node.java:363)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl$PluginNode.(EmbeddedElasticsearchNodeEnvironmentImpl.java:85)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl.start(EmbeddedElasticsearchNodeEnvironmentImpl.java:53)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.prepare(ElasticsearchSinkTestBase.java:73)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
> at 
> org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
> at 
> org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:113)
> at 
> org.apa

[GitHub] azagrebin commented on a change in pull request #6907: [FLINK-10357][tests] Improve StreamingFileSink E2E test stability.

2018-10-25 Thread GitBox
azagrebin commented on a change in pull request #6907: [FLINK-10357][tests] 
Improve StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228228728
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
+local previous_number_of_values=-1
+
+while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
 
 Review comment:
   I meant unix epoch time in seconds:
   `date +%s`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10357) Streaming File Sink end-to-end test failed with mismatch

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663923#comment-16663923
 ] 

ASF GitHub Bot commented on FLINK-10357:


GJL commented on a change in pull request #6907: [FLINK-10357][tests] Improve 
StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228226695
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
+local previous_number_of_values=-1
+
+while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
 
 Review comment:
   Using wall-clock time has other drawbacks, such as DST clock shifts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streaming File Sink end-to-end test failed with mismatch
> 
>
> Key: FLINK-10357
> URL: https://issues.apache.org/jira/browse/FLINK-10357
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.6.3, 1.7.0
>
> Attachments: flink-logs.tgz, flink-streaming-file-sink-logs.tgz
>
>
> The {{Streaming File Sink end-to-end test}} failed on an Amazon instance with 
> the following result: 
> {code}
> FAIL File Streaming Sink: Output hash mismatch.  Got 
> f2000bbc18a889dc8ec4b6f2b47bf9f5, expected 6727342fdd3aae2129e61fc8f433fb6f.
> head hexdump of actual:
> 000   0  \n   1  \n   2  \n   3  \n   4  \n   5  \n   6  \n   7  \n
> 010   8  \n   9  \n
> 014
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6907: [FLINK-10357][tests] Improve StreamingFileSink E2E test stability.

2018-10-25 Thread GitBox
GJL commented on a change in pull request #6907: [FLINK-10357][tests] Improve 
StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228226695
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
+local previous_number_of_values=-1
+
+while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
 
 Review comment:
   Using wall-clock time has other drawbacks, such as DST clock shifts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10681) elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA librar installed

2018-10-25 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10681:
--
Summary: elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA librar 
installed  (was: elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA 
installed)

> elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA librar installed
> --
>
> Key: FLINK-10681
> URL: https://issues.apache.org/jira/browse/FLINK-10681
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Tests
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.6.2, 1.7.0
>
>
> The {{elasticsearch6.ElasticsearchSinkITCase}} fails on systems where a wrong 
> JNA library is installed.
> {code}
> There is an incompatible JNA native library installed on this system
> Expected: 5.2.0
> Found:4.0.0
> /usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib.
> To resolve this issue you may do one of the following:
>  - remove or uninstall the offending library
>  - set the system property jna.nosys=true
>  - set jna.boot.library.path to include the path to the version of the
>jnidispatch library included with the JNA jar file you are using
> at com.sun.jna.Native.(Native.java:199)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at org.elasticsearch.bootstrap.Natives.(Natives.java:45)
> at 
> org.elasticsearch.bootstrap.BootstrapInfo.isMemoryLocked(BootstrapInfo.java:50)
> at 
> org.elasticsearch.monitor.process.ProcessProbe.processInfo(ProcessProbe.java:130)
> at 
> org.elasticsearch.monitor.process.ProcessService.(ProcessService.java:44)
> at 
> org.elasticsearch.monitor.MonitorService.(MonitorService.java:48)
> at org.elasticsearch.node.Node.(Node.java:363)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl$PluginNode.(EmbeddedElasticsearchNodeEnvironmentImpl.java:85)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl.start(EmbeddedElasticsearchNodeEnvironmentImpl.java:53)
> at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.prepare(ElasticsearchSinkTestBase.java:73)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
> at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
> at 
> org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
> at 
> org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
> at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:113)
> at 
> org.apache.maven.

[jira] [Commented] (FLINK-10357) Streaming File Sink end-to-end test failed with mismatch

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663919#comment-16663919
 ] 

ASF GitHub Bot commented on FLINK-10357:


GJL commented on a change in pull request #6907: [FLINK-10357][tests] Improve 
StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228225734
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
 
 Review comment:
   Good catch. I have forgotten about it. `$(get_complete_result | tail -1)` is 
even wrong because we start at `0`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streaming File Sink end-to-end test failed with mismatch
> 
>
> Key: FLINK-10357
> URL: https://issues.apache.org/jira/browse/FLINK-10357
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.6.3, 1.7.0
>
> Attachments: flink-logs.tgz, flink-streaming-file-sink-logs.tgz
>
>
> The {{Streaming File Sink end-to-end test}} failed on an Amazon instance with 
> the following result: 
> {code}
> FAIL File Streaming Sink: Output hash mismatch.  Got 
> f2000bbc18a889dc8ec4b6f2b47bf9f5, expected 6727342fdd3aae2129e61fc8f433fb6f.
> head hexdump of actual:
> 000   0  \n   1  \n   2  \n   3  \n   4  \n   5  \n   6  \n   7  \n
> 010   8  \n   9  \n
> 014
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10681) elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA installed

2018-10-25 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10681:
-

 Summary: elasticsearch6.ElasticsearchSinkITCase fails if wrong JNA 
installed
 Key: FLINK-10681
 URL: https://issues.apache.org/jira/browse/FLINK-10681
 Project: Flink
  Issue Type: Bug
  Components: ElasticSearch Connector, Tests
Affects Versions: 1.6.1, 1.7.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.6.2, 1.7.0


The {{elasticsearch6.ElasticsearchSinkITCase}} fails on systems where a wrong 
JNA library is installed.

{code}
There is an incompatible JNA native library installed on this system
Expected: 5.2.0
Found:4.0.0
/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib.
To resolve this issue you may do one of the following:
 - remove or uninstall the offending library
 - set the system property jna.nosys=true
 - set jna.boot.library.path to include the path to the version of the
   jnidispatch library included with the JNA jar file you are using

at com.sun.jna.Native.(Native.java:199)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.elasticsearch.bootstrap.Natives.(Natives.java:45)
at 
org.elasticsearch.bootstrap.BootstrapInfo.isMemoryLocked(BootstrapInfo.java:50)
at 
org.elasticsearch.monitor.process.ProcessProbe.processInfo(ProcessProbe.java:130)
at 
org.elasticsearch.monitor.process.ProcessService.(ProcessService.java:44)
at 
org.elasticsearch.monitor.MonitorService.(MonitorService.java:48)
at org.elasticsearch.node.Node.(Node.java:363)
at 
org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl$PluginNode.(EmbeddedElasticsearchNodeEnvironmentImpl.java:85)
at 
org.apache.flink.streaming.connectors.elasticsearch.EmbeddedElasticsearchNodeEnvironmentImpl.start(EmbeddedElasticsearchNodeEnvironmentImpl.java:53)
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase.prepare(ElasticsearchSinkTestBase.java:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
at 
org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
at 
org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:113)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
{code}

I propose to solve the problem by setting the system property 
{{jna.nosys=true}} to prefer the bundled JNA library.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] GJL commented on a change in pull request #6907: [FLINK-10357][tests] Improve StreamingFileSink E2E test stability.

2018-10-25 Thread GitBox
GJL commented on a change in pull request #6907: [FLINK-10357][tests] Improve 
StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228225734
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
 
 Review comment:
   Good catch. I have forgotten about it. `$(get_complete_result | tail -1)` is 
even wrong because we start at `0`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10599) Provide documentation for the modern kafka connector

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663914#comment-16663914
 ] 

ASF GitHub Bot commented on FLINK-10599:


pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228218658
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -108,15 +108,15 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 * Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
 * If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
 
-## Modern Kafka Connector
+## Kafka 0.11+ Connector
 
 Review comment:
   Sorry for previous comment, I've just realised that naming it `Kafka 0.11+ 
Connector` is a bit inconsistent since everywhere else we are claiming support 
for `>= 1.0.0". So maybe change this section to `## Kafka 1.0.0+ Connector`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide documentation for the modern kafka connector
> 
>
> Key: FLINK-10599
> URL: https://issues.apache.org/jira/browse/FLINK-10599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6889: [FLINK-10599][Documentation] Provide documentation for the modern kafka connector

2018-10-25 Thread GitBox
pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228218658
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -108,15 +108,15 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 * Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
 * If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
 
-## Modern Kafka Connector
+## Kafka 0.11+ Connector
 
 Review comment:
   Sorry for previous comment, I've just realised that naming it `Kafka 0.11+ 
Connector` is a bit inconsistent since everywhere else we are claiming support 
for `>= 1.0.0". So maybe change this section to `## Kafka 1.0.0+ Connector`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10357) Streaming File Sink end-to-end test failed with mismatch

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663907#comment-16663907
 ] 

ASF GitHub Bot commented on FLINK-10357:


azagrebin commented on a change in pull request #6907: [FLINK-10357][tests] 
Improve StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228218936
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
 
 Review comment:
   any reason to have it differently from the same action below:
   ```
   number_of_values=$(get_complete_result | wc -l | tr -d '[:space:]')
   ```
   in particular:
   `tail -1` vs `wc -l | tr -d '[:space:]'`
   `wc -l | tr -d '[:space:]'` should be more reliable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streaming File Sink end-to-end test failed with mismatch
> 
>
> Key: FLINK-10357
> URL: https://issues.apache.org/jira/browse/FLINK-10357
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.6.3, 1.7.0
>
> Attachments: flink-logs.tgz, flink-streaming-file-sink-logs.tgz
>
>
> The {{Streaming File Sink end-to-end test}} failed on an Amazon instance with 
> the following result: 
> {code}
> FAIL File Streaming Sink: Output hash mismatch.  Got 
> f2000bbc18a889dc8ec4b6f2b47bf9f5, expected 6727342fdd3aae2129e61fc8f433fb6f.
> head hexdump of actual:
> 000   0  \n   1  \n   2  \n   3  \n   4  \n   5  \n   6  \n   7  \n
> 010   8  \n   9  \n
> 014
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10357) Streaming File Sink end-to-end test failed with mismatch

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663908#comment-16663908
 ] 

ASF GitHub Bot commented on FLINK-10357:


azagrebin commented on a change in pull request #6907: [FLINK-10357][tests] 
Improve StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228220548
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
+local previous_number_of_values=-1
+
+while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
 
 Review comment:
   `seconds_elapsed  = system end - start time` 
   would be more precise but probably does not matter here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streaming File Sink end-to-end test failed with mismatch
> 
>
> Key: FLINK-10357
> URL: https://issues.apache.org/jira/browse/FLINK-10357
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.6.3, 1.7.0
>
> Attachments: flink-logs.tgz, flink-streaming-file-sink-logs.tgz
>
>
> The {{Streaming File Sink end-to-end test}} failed on an Amazon instance with 
> the following result: 
> {code}
> FAIL File Streaming Sink: Output hash mismatch.  Got 
> f2000bbc18a889dc8ec4b6f2b47bf9f5, expected 6727342fdd3aae2129e61fc8f433fb6f.
> head hexdump of actual:
> 000   0  \n   1  \n   2  \n   3  \n   4  \n   5  \n   6  \n   7  \n
> 010   8  \n   9  \n
> 014
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6907: [FLINK-10357][tests] Improve StreamingFileSink E2E test stability.

2018-10-25 Thread GitBox
azagrebin commented on a change in pull request #6907: [FLINK-10357][tests] 
Improve StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228220548
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
+local previous_number_of_values=-1
+
+while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
+if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
 
 Review comment:
   `seconds_elapsed  = system end - start time` 
   would be more precise but probably does not matter here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6907: [FLINK-10357][tests] Improve StreamingFileSink E2E test stability.

2018-10-25 Thread GitBox
azagrebin commented on a change in pull request #6907: [FLINK-10357][tests] 
Improve StreamingFileSink E2E test stability.
URL: https://github.com/apache/flink/pull/6907#discussion_r228218936
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
 ##
 @@ -45,47 +40,55 @@ function wait_for_restart {
 }
 
 ###
-# Wait a specific number of successful checkpoints
-# to have happened
+# Get all lines in part files and sort them numerically.
 #
 # Globals:
-#   None
+#   OUTPUT_PATH
 # Arguments:
-#   $1: the job id
-#   $2: the number of expected successful checkpoints
-#   $3: timeout in seconds
+#   None
 # Returns:
 #   None
 ###
-function wait_for_number_of_checkpoints {
-local job_id=$1
-local expected_num_checkpoints=$2
-local timeout=$3
-local count=0
-
-echo "Starting to wait for completion of ${expected_num_checkpoints} 
checkpoints"
-while (($(get_completed_number_of_checkpoints ${job_id}) < 
${expected_num_checkpoints})); do
+function get_complete_result {
+find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort 
-g
+}
 
-if [[ ${count} -gt ${timeout} ]]; then
-echo "A timeout occurred waiting for successful checkpoints"
+###
+# Waits until a number of values have been written within a timeout.
+# If the timeout expires, exit with return code 1.
+#
+# Globals:
+#   None
+# Arguments:
+#   $1: the number of expected values
+#   $2: timeout in seconds
+# Returns:
+#   None
+###
+function wait_for_complete_result {
+local expected_number_of_values=$1
+local polling_timeout=$2
+local polling_interval=1
+local seconds_elapsed=0
+
+local number_of_values=$(get_complete_result | tail -1)
 
 Review comment:
   any reason to have it differently from the same action below:
   ```
   number_of_values=$(get_complete_result | wc -l | tr -d '[:space:]')
   ```
   in particular:
   `tail -1` vs `wc -l | tr -d '[:space:]'`
   `wc -l | tr -d '[:space:]'` should be more reliable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10599) Provide documentation for the modern kafka connector

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663900#comment-16663900
 ] 

ASF GitHub Bot commented on FLINK-10599:


pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228219609
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -650,13 +650,13 @@ into a Kafka topic.
   for more explanation.
 
 
- Kafka 0.11
+ Kafka 0.11 and newer
 
-With Flink's checkpointing enabled, the `FlinkKafkaProducer011` can provide
+With Flink's checkpointing enabled, the `FlinkKafkaProducer011` (for modern 
Kafka connector, it is `FlinkKafkaProducer`.) can provide
 
 Review comment:
   ditto (  FlinkKafkaProducer for Kafka >= 1.0.0 versions). )


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide documentation for the modern kafka connector
> 
>
> Key: FLINK-10599
> URL: https://issues.apache.org/jira/browse/FLINK-10599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10599) Provide documentation for the modern kafka connector

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663896#comment-16663896
 ] 

ASF GitHub Bot commented on FLINK-10599:


pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228218658
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -108,15 +108,15 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 * Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
 * If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
 
-## Modern Kafka Connector
+## Kafka 0.11+ Connector
 
 Review comment:
   Sorry for previous comment, I just realised that naming it `Kafka 0.11+ 
Connector` is a bit inconsistent since everywhere else we are claiming support 
for `>= 1.0.0". So maybe change this section to `## Kafka 1.0.0+ Connector`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide documentation for the modern kafka connector
> 
>
> Key: FLINK-10599
> URL: https://issues.apache.org/jira/browse/FLINK-10599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10599) Provide documentation for the modern kafka connector

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663899#comment-16663899
 ] 

ASF GitHub Bot commented on FLINK-10599:


pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228217863
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -85,7 +85,7 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
 1.7.0
 FlinkKafkaConsumer
 FlinkKafkaProducer
->= 1.x
+>= 1.0.0
 This Kafka connector attempts to track the latest version of the 
Kafka client. The version of the client it uses may change between Flink 
releases. Modern Kafka clients are backwards compatible with broker versions 
0.10.0 or later.
 
 Review comment:
   Replace to:
   
   Modern Kafka clients are backwards compatible with broker versions 0.10.0 or 
later. However for Kafka 0.11.x and 0.10.x versions, we recommend using 
dedicated `flink-connector-kafka-0.11` and `link-connector-kafka-0.10` 
respectively.
   
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide documentation for the modern kafka connector
> 
>
> Key: FLINK-10599
> URL: https://issues.apache.org/jira/browse/FLINK-10599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10599) Provide documentation for the modern kafka connector

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663901#comment-16663901
 ] 

ASF GitHub Bot commented on FLINK-10599:


pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228219298
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -524,7 +524,7 @@ In the meanwhile, a possible workaround is to send 
*heartbeat messages* to all c
 
 ## Kafka Producer
 
-Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 
0.10.0.x versions, etc.).
+Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 
0.10.0.x versions, etc. Note that since modern Kafka connector, it will not 
carry the version number of Kafka.).
 
 Review comment:
   ditto?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide documentation for the modern kafka connector
> 
>
> Key: FLINK-10599
> URL: https://issues.apache.org/jira/browse/FLINK-10599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10599) Provide documentation for the modern kafka connector

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663897#comment-16663897
 ] 

ASF GitHub Bot commented on FLINK-10599:


pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228219170
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -134,7 +134,7 @@ Then instantiate the new source (`FlinkKafkaConsumer`) and 
sink (`FlinkKafkaProd
 
 ## Kafka Consumer
 
-Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 
0.9.0.x versions, etc.). It provides access to one or more Kafka topics.
+Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 
0.9.0.x versions, etc. Note that since modern Kafka connector, it will not 
carry the version number of Kafka.). It provides access to one or more Kafka 
topics.
 
 Review comment:
   (or `09` for Kafka 0.9.0.x versions, etc. or just `FlinkKafkaConsumer` for 
Kafka >= 1.0.0 versions).
   
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide documentation for the modern kafka connector
> 
>
> Key: FLINK-10599
> URL: https://issues.apache.org/jira/browse/FLINK-10599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10599) Provide documentation for the modern kafka connector

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663898#comment-16663898
 ] 

ASF GitHub Bot commented on FLINK-10599:


pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228219659
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -650,13 +650,13 @@ into a Kafka topic.
   for more explanation.
 
 
- Kafka 0.11
+ Kafka 0.11 and newer
 
-With Flink's checkpointing enabled, the `FlinkKafkaProducer011` can provide
+With Flink's checkpointing enabled, the `FlinkKafkaProducer011` (for modern 
Kafka connector, it is `FlinkKafkaProducer`.) can provide
 exactly-once delivery guarantees.
 
 Besides enabling Flink's checkpointing, you can also choose three different 
modes of operating
-chosen by passing appropriate `semantic` parameter to the 
`FlinkKafkaProducer011`:
+chosen by passing appropriate `semantic` parameter to the 
`FlinkKafkaProducer011` (for modern Kafka connector, it is 
`FlinkKafkaProducer`.):
 
 Review comment:
   ditto?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide documentation for the modern kafka connector
> 
>
> Key: FLINK-10599
> URL: https://issues.apache.org/jira/browse/FLINK-10599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6889: [FLINK-10599][Documentation] Provide documentation for the modern kafka connector

2018-10-25 Thread GitBox
pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228219659
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -650,13 +650,13 @@ into a Kafka topic.
   for more explanation.
 
 
- Kafka 0.11
+ Kafka 0.11 and newer
 
-With Flink's checkpointing enabled, the `FlinkKafkaProducer011` can provide
+With Flink's checkpointing enabled, the `FlinkKafkaProducer011` (for modern 
Kafka connector, it is `FlinkKafkaProducer`.) can provide
 exactly-once delivery guarantees.
 
 Besides enabling Flink's checkpointing, you can also choose three different 
modes of operating
-chosen by passing appropriate `semantic` parameter to the 
`FlinkKafkaProducer011`:
+chosen by passing appropriate `semantic` parameter to the 
`FlinkKafkaProducer011` (for modern Kafka connector, it is 
`FlinkKafkaProducer`.):
 
 Review comment:
   ditto?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6889: [FLINK-10599][Documentation] Provide documentation for the modern kafka connector

2018-10-25 Thread GitBox
pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228218658
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -108,15 +108,15 @@ Note that the streaming connectors are currently not 
part of the binary distribu
 * Follow the instructions from [Kafka's 
quickstart](https://kafka.apache.org/documentation.html#quickstart) to download 
the code and launch a server (launching a Zookeeper and a Kafka server is 
required every time before starting the application).
 * If the Kafka and Zookeeper servers are running on a remote machine, then the 
`advertised.host.name` setting in the `config/server.properties` file must be 
set to the machine's IP address.
 
-## Modern Kafka Connector
+## Kafka 0.11+ Connector
 
 Review comment:
   Sorry for previous comment, I just realised that naming it `Kafka 0.11+ 
Connector` is a bit inconsistent since everywhere else we are claiming support 
for `>= 1.0.0". So maybe change this section to `## Kafka 1.0.0+ Connector`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6889: [FLINK-10599][Documentation] Provide documentation for the modern kafka connector

2018-10-25 Thread GitBox
pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228217863
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -85,7 +85,7 @@ For most users, the `FlinkKafkaConsumer08` (part of 
`flink-connector-kafka`) is
 1.7.0
 FlinkKafkaConsumer
 FlinkKafkaProducer
->= 1.x
+>= 1.0.0
 This Kafka connector attempts to track the latest version of the 
Kafka client. The version of the client it uses may change between Flink 
releases. Modern Kafka clients are backwards compatible with broker versions 
0.10.0 or later.
 
 Review comment:
   Replace to:
   
   Modern Kafka clients are backwards compatible with broker versions 0.10.0 or 
later. However for Kafka 0.11.x and 0.10.x versions, we recommend using 
dedicated `flink-connector-kafka-0.11` and `link-connector-kafka-0.10` 
respectively.
   
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6889: [FLINK-10599][Documentation] Provide documentation for the modern kafka connector

2018-10-25 Thread GitBox
pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228219170
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -134,7 +134,7 @@ Then instantiate the new source (`FlinkKafkaConsumer`) and 
sink (`FlinkKafkaProd
 
 ## Kafka Consumer
 
-Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 
0.9.0.x versions, etc.). It provides access to one or more Kafka topics.
+Flink's Kafka consumer is called `FlinkKafkaConsumer08` (or `09` for Kafka 
0.9.0.x versions, etc. Note that since modern Kafka connector, it will not 
carry the version number of Kafka.). It provides access to one or more Kafka 
topics.
 
 Review comment:
   (or `09` for Kafka 0.9.0.x versions, etc. or just `FlinkKafkaConsumer` for 
Kafka >= 1.0.0 versions).
   
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663891#comment-16663891
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228219980
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/modern-confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+echo "Waiting for broker..."
+sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+
+  PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+}
+
+function create_kafka_topic {
+  $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor $1 --partitions $2 --topic $3
+}
+
+function send_messages_to_kafka {
+  echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic $2
+}
+
+function read_messages_from_kafka {
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning \
+--max-messages $1 \
+--topic $2 \
+--consumer-property group.id=$3 2> /dev/null
+}
+
+function send_messages_to_kafka_avro {
 
 Review comment:
   It's used in `test_confluent_schema_registry.sh`
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-en

[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread GitBox
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228220202
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/modern-confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+echo "Waiting for broker..."
+sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+
+  PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+}
+
+function create_kafka_topic {
+  $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor $1 --partitions $2 --topic $3
+}
+
+function send_messages_to_kafka {
+  echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic $2
+}
+
+function read_messages_from_kafka {
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning \
+--max-messages $1 \
+--topic $2 \
+--consumer-property group.id=$3 2> /dev/null
+}
+
+function send_messages_to_kafka_avro {
+echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list 
localhost:9092 --topic $2 --property value.schema=$3 --property 
schema.registry.url=${SCHEMA_REGISTRY_URL}
+}
+
+function modify_num_partitions {
+  $KAFKA_DIR/bin/kafka-topics.sh --alter --topic $1 --partitions $2 
--zookeeper localhost:2181
+}
+
+function get_num_partitions {
+  $KAFKA_DIR/bin/kafka-topics.sh --describe --topic $1 --zookeeper 
localhost:2181 | grep -Eo "PartitionCount:[0-9]+" | cut -d ":" -f 2
+}
+
+function get_partition_end_offset {
+  local topic=$1
+  local partition=$2
+
+  # first, use the console consumer to produce a dummy consumer group
+  read_messages_from_kafka 

[GitHub] pnowojski commented on a change in pull request #6889: [FLINK-10599][Documentation] Provide documentation for the modern kafka connector

2018-10-25 Thread GitBox
pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228219298
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -524,7 +524,7 @@ In the meanwhile, a possible workaround is to send 
*heartbeat messages* to all c
 
 ## Kafka Producer
 
-Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 
0.10.0.x versions, etc.).
+Flink’s Kafka Producer is called `FlinkKafkaProducer011` (or `010` for Kafka 
0.10.0.x versions, etc. Note that since modern Kafka connector, it will not 
carry the version number of Kafka.).
 
 Review comment:
   ditto?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6889: [FLINK-10599][Documentation] Provide documentation for the modern kafka connector

2018-10-25 Thread GitBox
pnowojski commented on a change in pull request #6889: 
[FLINK-10599][Documentation] Provide documentation for the modern kafka 
connector
URL: https://github.com/apache/flink/pull/6889#discussion_r228219609
 
 

 ##
 File path: docs/dev/connectors/kafka.md
 ##
 @@ -650,13 +650,13 @@ into a Kafka topic.
   for more explanation.
 
 
- Kafka 0.11
+ Kafka 0.11 and newer
 
-With Flink's checkpointing enabled, the `FlinkKafkaProducer011` can provide
+With Flink's checkpointing enabled, the `FlinkKafkaProducer011` (for modern 
Kafka connector, it is `FlinkKafkaProducer`.) can provide
 
 Review comment:
   ditto (  FlinkKafkaProducer for Kafka >= 1.0.0 versions). )


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663894#comment-16663894
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228220202
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/modern-confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+echo "Waiting for broker..."
+sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+
+  PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+}
+
+function create_kafka_topic {
+  $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor $1 --partitions $2 --topic $3
+}
+
+function send_messages_to_kafka {
+  echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic $2
+}
+
+function read_messages_from_kafka {
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning \
+--max-messages $1 \
+--topic $2 \
+--consumer-property group.id=$3 2> /dev/null
+}
+
+function send_messages_to_kafka_avro {
+echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list 
localhost:9092 --topic $2 --property value.schema=$3 --property 
schema.registry.url=${SCHEMA_REGISTRY_URL}
+}
+
+function modify_num_partitions {
+  $KAFKA_DIR/bin/kafka-topics.sh --alter --topic $1 --partitions $2 
--zookeeper localhost:2181
+}
+
+function get_num_partitions {
+  $KAFKA_DIR/bin/kafka-topics.sh --describe --topic $1 --zo

[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread GitBox
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228219980
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/modern-confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+echo "Waiting for broker..."
+sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+
+  PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+}
+
+function create_kafka_topic {
+  $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor $1 --partitions $2 --topic $3
+}
+
+function send_messages_to_kafka {
+  echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic $2
+}
+
+function read_messages_from_kafka {
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning \
+--max-messages $1 \
+--topic $2 \
+--consumer-property group.id=$3 2> /dev/null
+}
+
+function send_messages_to_kafka_avro {
 
 Review comment:
   It's used in `test_confluent_schema_registry.sh`
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663889#comment-16663889
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228219566
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/modern-confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+echo "Waiting for broker..."
+sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
 
 Review comment:
   Actually, I did not change the logic of this function. So it also exists in 
`kafka-common.sh`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread GitBox
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228219566
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/modern-confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+echo "Waiting for broker..."
+sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
 
 Review comment:
   Actually, I did not change the logic of this function. So it also exists in 
`kafka-common.sh`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663881#comment-16663881
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228217901
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
 
 Review comment:
   it used in `test_confluent_schema_registry`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread GitBox
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228217901
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
 
 Review comment:
   it used in `test_confluent_schema_registry`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10599) Provide documentation for the modern kafka connector

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663868#comment-16663868
 ] 

ASF GitHub Bot commented on FLINK-10599:


yanghua commented on issue #6889: [FLINK-10599][Documentation] Provide 
documentation for the modern kafka connector
URL: https://github.com/apache/flink/pull/6889#issuecomment-433089952
 
 
   @pnowojski thanks for your suggestion, updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide documentation for the modern kafka connector
> 
>
> Key: FLINK-10599
> URL: https://issues.apache.org/jira/browse/FLINK-10599
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10096) Metrics report incorrect subtask ids

2018-10-25 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663869#comment-16663869
 ] 

Piotr Nowojski commented on FLINK-10096:


Maybe in that case renaming the ticket would be better a better solution then 
closing it? Or creating a new one to fix the string representation?

> Metrics report incorrect subtask ids
> 
>
> Key: FLINK-10096
> URL: https://issues.apache.org/jira/browse/FLINK-10096
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.6.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Subtask id is of by one when it's being reported by metrics compared to what 
> is printed by {{StreamTask.toString()}}. For example metrics line:
> Initial sorter.137.buffers.inputQueueLength: 392
> vs actual name of this subtask:
> Initial sorter (138/192)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6889: [FLINK-10599][Documentation] Provide documentation for the modern kafka connector

2018-10-25 Thread GitBox
yanghua commented on issue #6889: [FLINK-10599][Documentation] Provide 
documentation for the modern kafka connector
URL: https://github.com/apache/flink/pull/6889#issuecomment-433089952
 
 
   @pnowojski thanks for your suggestion, updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10666) Port YarnClusterDescriptorTest to new codebase

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663821#comment-16663821
 ] 

ASF GitHub Bot commented on FLINK-10666:


TisonKun edited a comment on issue #6919: [FLINK-10666] [tests] Port 
YarnClusterDescriptorTest to new codebase
URL: https://github.com/apache/flink/pull/6919#issuecomment-433075429
 
 
   cc @StefanRRichter @tillrohrmann @GJL 
   
   I think this pull request is quite trivial since it just replaces 
`LegacyYarnClusterDescriptor` with `YarnClusterDescriptor`. Our new 
`YarnClusterDescriptor` should and does pass the existing tests. Not many 
things to review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port YarnClusterDescriptorTest to new codebase
> --
>
> Key: FLINK-10666
> URL: https://issues.apache.org/jira/browse/FLINK-10666
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{YarnClusterDescriptorTest}} to new codebase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10666) Port YarnClusterDescriptorTest to new codebase

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663820#comment-16663820
 ] 

ASF GitHub Bot commented on FLINK-10666:


TisonKun commented on issue #6919: [FLINK-10666] [tests] Port 
YarnClusterDescriptorTest to new codebase
URL: https://github.com/apache/flink/pull/6919#issuecomment-433075429
 
 
   cc @StefanRRichter @tillrohrmann
   
   I think this pull request is quite trivial since it just replaces 
`LegacyYarnClusterDescriptor` with `YarnClusterDescriptor`. Our new 
`YarnClusterDescriptor` should and does pass the existing tests. Not many 
things to review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port YarnClusterDescriptorTest to new codebase
> --
>
> Key: FLINK-10666
> URL: https://issues.apache.org/jira/browse/FLINK-10666
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{YarnClusterDescriptorTest}} to new codebase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6919: [FLINK-10666] [tests] Port YarnClusterDescriptorTest to new codebase

2018-10-25 Thread GitBox
TisonKun commented on issue #6919: [FLINK-10666] [tests] Port 
YarnClusterDescriptorTest to new codebase
URL: https://github.com/apache/flink/pull/6919#issuecomment-433075429
 
 
   cc @StefanRRichter @tillrohrmann
   
   I think this pull request is quite trivial since it just replaces 
`LegacyYarnClusterDescriptor` with `YarnClusterDescriptor`. Our new 
`YarnClusterDescriptor` should and does pass the existing tests. Not many 
things to review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun edited a comment on issue #6919: [FLINK-10666] [tests] Port YarnClusterDescriptorTest to new codebase

2018-10-25 Thread GitBox
TisonKun edited a comment on issue #6919: [FLINK-10666] [tests] Port 
YarnClusterDescriptorTest to new codebase
URL: https://github.com/apache/flink/pull/6919#issuecomment-433075429
 
 
   cc @StefanRRichter @tillrohrmann @GJL 
   
   I think this pull request is quite trivial since it just replaces 
`LegacyYarnClusterDescriptor` with `YarnClusterDescriptor`. Our new 
`YarnClusterDescriptor` should and does pass the existing tests. Not many 
things to review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #6917: [FLINK-10665] [tests] Port YARNSessionFIFOITCase#testJavaAPI to new c…

2018-10-25 Thread GitBox
TisonKun commented on issue #6917: [FLINK-10665] [tests] Port 
YARNSessionFIFOITCase#testJavaAPI to new c…
URL: https://github.com/apache/flink/pull/6917#issuecomment-433074450
 
 
   @zentol I fix the checkstyle issue. Can you review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10665) Port YARNSessionFIFOITCase#testJavaAPI to new codebase

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663816#comment-16663816
 ] 

ASF GitHub Bot commented on FLINK-10665:


TisonKun commented on issue #6917: [FLINK-10665] [tests] Port 
YARNSessionFIFOITCase#testJavaAPI to new c…
URL: https://github.com/apache/flink/pull/6917#issuecomment-433074450
 
 
   @zentol I fix the checkstyle issue. Can you review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port YARNSessionFIFOITCase#testJavaAPI to new codebase
> --
>
> Key: FLINK-10665
> URL: https://issues.apache.org/jira/browse/FLINK-10665
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{YARNSessionFIFOITCase#testJavaAPI}} to new codebase



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10096) Metrics report incorrect subtask ids

2018-10-25 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663806#comment-16663806
 ] 

Chesnay Schepler commented on FLINK-10096:
--

I did not close it due to being low priority, but because I sincerely do not 
have any plans to change the metric system as proposed here.

I'd rather change the string representation of stream-tasks since that is the 
inconsistent part; it conflicts with the metric system, rest api and all(most?) 
logging messages that log the subtask index .

> Metrics report incorrect subtask ids
> 
>
> Key: FLINK-10096
> URL: https://issues.apache.org/jira/browse/FLINK-10096
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.6.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Subtask id is of by one when it's being reported by metrics compared to what 
> is printed by {{StreamTask.toString()}}. For example metrics line:
> Initial sorter.137.buffers.inputQueueLength: 392
> vs actual name of this subtask:
> Initial sorter (138/192)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10668) Streaming File Sink E2E test fails because not all legitimate exceptions are excluded

2018-10-25 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663794#comment-16663794
 ] 

Hequn Cheng commented on FLINK-10668:
-

[~gjy] Hi, thanks for the information about how to reproduce the error. It's 
very helpful! 

> Streaming File Sink E2E test fails because not all legitimate exceptions are 
> excluded
> -
>
> Key: FLINK-10668
> URL: https://issues.apache.org/jira/browse/FLINK-10668
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Gary Yao
>Assignee: Hequn Cheng
>Priority: Critical
> Fix For: 1.6.3, 1.7.0
>
>
> Streaming File Sink E2E test fails because not all legitimate exceptions are 
> excluded.
> The stacktrace below can appear in the logs generated by the test but 
> {{check_logs_for_exceptions}} does not exclude all expected exceptions.
> {noformat}
> java.io.IOException: Connecting the channel failed: Connecting to remote task 
> manager + 'xxx/10.0.x.xx:50849' has failed. This might indicate that the 
> remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:85)
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:165)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connecting to remote task manager + 'xxx/10.0.x.xx:50849' has failed. 
> This might indicate that the remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:219)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:133)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.ja

[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663757#comment-16663757
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228179104
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/modern-confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+echo "Waiting for broker..."
+sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
 
 Review comment:
   Why the script for kafka 0.10 didn't have this killing part? Should it have?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663756#comment-16663756
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228181381
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaExample.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+
+/**
+ * A simple example that shows how to read from and write to modern Kafka. 
This will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group 
by some key, and finally
+ * perform a rolling addition on each key for which the results are written 
back to another topic.
+ *
+ * This example also demonstrates using a watermark assigner to generate 
per-partition
+ * watermarks directly in the Flink Kafka consumer. For demonstration 
purposes, it is assumed that
+ * the String messages are of formatted as a (word,frequency,timestamp) tuple.
+ *
+ * Example usage:
+ * --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092
+ * --zookeeper.connect localhost:2181 --group.id myconsumer
+ */
+public class KafkaExample {
+
+   public static void main(String[] args) throws Exception {
+   // parse input arguments
+   final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+   if (parameterTool.getNumberOfParameters() < 5) {
+   System.out.println("Missing parameters!\n" +
+   "Usage: Kafka --input-topic  
--output-topic  " +
+   "--bootstrap.servers  " +
+   "--zookeeper.connect  --group.id 
");
+   return;
+   }
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.getConfig().disableSysoutLogging();
+   
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
+   env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   env.getConfig().setGlobalJobParameters(parameterTool); // make 
parameters available in the web interface
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   DataStream input = env
+   .addSource(
+   new FlinkKafkaConsumer<>(
+   
parameterTool.getRequired("input-topic"),
+   new KafkaEventSchema(),
+   parameterTool.getProperties())
+   .assignTimestampsAndWatermarks(new 
CustomWatermarkExtractor()))
+   .keyBy("word")
+   .map(new RollingAdditionMapper());
+
+   input.addSink(
+   new FlinkKafkaProducer<>(
+   parameterTool.getRequired("output-topic"),
+   new KafkaEventSchema(),
+   parameterTool.getProperties()));
 
 Review comment:
   I think it would be better to use `EXACTLY_ONCE` mode here


This is an automated message from the Apache Git Service.
To respond to the message, pleas

[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663758#comment-16663758
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228179628
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/modern-confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+echo "Waiting for broker..."
+sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+
+  PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+}
+
+function create_kafka_topic {
+  $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor $1 --partitions $2 --topic $3
+}
+
+function send_messages_to_kafka {
+  echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic $2
+}
+
+function read_messages_from_kafka {
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning \
+--max-messages $1 \
+--topic $2 \
+--consumer-property group.id=$3 2> /dev/null
+}
+
+function send_messages_to_kafka_avro {
+echo -e $1 | $CONFLUENT_DIR/bin/kafka-avro-console-producer --broker-list 
localhost:9092 --topic $2 --property value.schema=$3 --property 
schema.registry.url=${SCHEMA_REGISTRY_URL}
+}
+
+function modify_num_partitions {
+  $KAFKA_DIR/bin/kafka-topics.sh --alter --topic $1 --partitions $2 
--zookeeper localhost:2181
+}
+
+function get_num_partitions {
+  $KAFKA_DIR/bin/kafka-topics.sh --describe --topic $1 --

[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663760#comment-16663760
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228178672
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
 
 Review comment:
   is this used anywhere?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663754#comment-16663754
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228178423
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
 
 Review comment:
   for example it looks like this method differs only with the `KAFKA_URL` 
"parameter"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663759#comment-16663759
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228179323
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before modern-kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-2.0.0
+CONFLUENT_DIR=$TEST_DATA_DIR/confluent-5.0.0
+SCHEMA_REGISTRY_PORT=8082
+SCHEMA_REGISTRY_URL=http://localhost:${SCHEMA_REGISTRY_PORT}
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  KAFKA_URL="https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/modern-kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function setup_confluent_dist {
+  # download confluent
+  mkdir -p $TEST_DATA_DIR
+  
CONFLUENT_URL="http://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz";
+  echo "Downloading confluent from $CONFLUENT_URL"
+  curl "$CONFLUENT_URL" > $TEST_DATA_DIR/modern-confluent.tgz
+
+  tar xzf $TEST_DATA_DIR/modern-confluent.tgz -C $TEST_DATA_DIR/
+
+  # fix confluent config
+  sed -i -e 
"s#listeners=http://0.0.0.0:8081#listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}#";
 $CONFLUENT_DIR/etc/schema-registry/schema-registry.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+echo "Waiting for broker..."
+sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+
+  PIDS=$(jps -vl | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+
+  PIDS=$(jps -vl | grep java | grep -i QuorumPeerMain | grep -v grep | awk 
'{print $1}')
+
+  if [ ! -z "$PIDS" ]; then
+kill -s TERM $PIDS
+  fi
+}
+
+function create_kafka_topic {
+  $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor $1 --partitions $2 --topic $3
+}
+
+function send_messages_to_kafka {
+  echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic $2
+}
+
+function read_messages_from_kafka {
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--from-beginning \
+--max-messages $1 \
+--topic $2 \
+--consumer-property group.id=$3 2> /dev/null
+}
+
+function send_messages_to_kafka_avro {
 
 Review comment:
   Is this used anywhere?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka co

[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663752#comment-16663752
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228179894
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_kafka.sh
 ##
 @@ -0,0 +1,114 @@
+#!/usr/bin/env bash
 
 Review comment:
   Please deduplicate this as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663755#comment-16663755
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228181171
 
 

 ##
 File path: 
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/KafkaExample.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.kafka;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+
+/**
+ * A simple example that shows how to read from and write to modern Kafka. 
This will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group 
by some key, and finally
+ * perform a rolling addition on each key for which the results are written 
back to another topic.
+ *
+ * This example also demonstrates using a watermark assigner to generate 
per-partition
+ * watermarks directly in the Flink Kafka consumer. For demonstration 
purposes, it is assumed that
+ * the String messages are of formatted as a (word,frequency,timestamp) tuple.
+ *
+ * Example usage:
+ * --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092
+ * --zookeeper.connect localhost:2181 --group.id myconsumer
+ */
+public class KafkaExample {
+
+   public static void main(String[] args) throws Exception {
 
 Review comment:
   lines from 43 to 60 also could be deduplicated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663753#comment-16663753
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228178228
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test-data/modern-kafka-common.sh
 ##
 @@ -0,0 +1,152 @@
+#!/usr/bin/env bash
 
 Review comment:
   Please deduplicate the code here with `kafka-common.sh`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >