[jira] [Commented] (FLINK-11824) Event-time attribute cannot have same name as in original format

2019-03-09 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788860#comment-16788860
 ] 

Hequn Cheng commented on FLINK-11824:
-

+1. Would be better to be able to have the same name as the original field. It 
is also aligned with the behavior of {{StreamTableSource}} with 
{{DefinedRowtimeAttributes}}

> Event-time attribute cannot have same name as in original format
> 
>
> Key: FLINK-11824
> URL: https://issues.apache.org/jira/browse/FLINK-11824
> Project: Flink
>  Issue Type: Bug
>  Components: API / Table SQL
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Fabian Hueske
>Priority: Minor
>
> When a table is defined, event-time attributes are typically defined by 
> linking them to an existing field in the original format (e.g., CSV, Avro, 
> JSON, ...). However, right now, the event-time attribute in the defined table 
> cannot have the same name as the original field.
> The following table definition fails with an exception:
> {code}
> // set up execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val names: Array[String] = Array("name", "t")
> val types: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG)
> tEnv.connect(new Kafka()
> .version("universal")
> .topic("namesTopic")
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")
> .property("group.id", "testGroup"))
>   .withFormat(new Csv()
> .schema(Types.ROW(names, types)))
>   .withSchema(new Schema()
> .field("name", Types.STRING)
> .field("t", Types.SQL_TIMESTAMP) // changing "t" to "t2" works
>   .rowtime(new Rowtime()
> .timestampsFromField("t")
> .watermarksPeriodicAscending()))
>   .inAppendMode()
>   .registerTableSource("Names")
> {code}
> {code}
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Field 't' could not be resolved by the field mapping.
>   at 
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
>   at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>   at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>   at 
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
>   at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
>   at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
>   at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
>   at 
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11870) Code refactor for some details

2019-03-09 Thread shiwuliang (JIRA)
shiwuliang created FLINK-11870:
--

 Summary: Code refactor for some details
 Key: FLINK-11870
 URL: https://issues.apache.org/jira/browse/FLINK-11870
 Project: Flink
  Issue Type: Improvement
Reporter: shiwuliang


Recently, when I read the source code, I found that there are some points that 
can be optimized in the source code, and it still seems to have not been 
completely solved, for example:

 * Use computeIfAbsent

 * extra calls

I think I can try to optimize them. This is just a very small problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #7948: Code refactor for some details.

2019-03-09 Thread GitBox
flinkbot commented on issue #7948: Code refactor for some details.
URL: https://github.com/apache/flink/pull/7948#issuecomment-471254709
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carryxyh opened a new pull request #7948: Code refactor for some details.

2019-03-09 Thread GitBox
carryxyh opened a new pull request #7948: Code refactor for some details.
URL: https://github.com/apache/flink/pull/7948
 
 
   ## What is the purpose of the change
   
   Code refactor for some details
   
   
   ## Brief change log
   
   * use `computeIfAbsent` to simple code.
   * remove duplicate call for `jobGraph.getJobID()`
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): ( no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Tom-Goong commented on a change in pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value

2019-03-09 Thread GitBox
Tom-Goong commented on a change in pull request #7947: [FLINK-11867][Build 
System]Mistaking in checking filePath's value
URL: https://github.com/apache/flink/pull/7947#discussion_r264022486
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ##
 @@ -958,8 +959,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
 * @return The data stream that represents the data read from the given 
file as text lines
 */
public DataStreamSource readTextFile(String filePath, String 
charsetName) {
-   Preconditions.checkNotNull(filePath, "The file path must not be 
null.");
-   Preconditions.checkNotNull(filePath.isEmpty(), "The file path 
must not be empty.");
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The 
file path must not be blank.");
 
 Review comment:
   > Good catch and thanks for your contribution. Only two minor comments left, 
please consider to modify the error message.
   
   ok, have modified.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11869) [checkpoint] Make buffer size in checkpoint stream factory configurable

2019-03-09 Thread Yun Tang (JIRA)
Yun Tang created FLINK-11869:


 Summary: [checkpoint] Make buffer size in checkpoint stream 
factory configurable
 Key: FLINK-11869
 URL: https://issues.apache.org/jira/browse/FLINK-11869
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.9.0


Currently, the default buffer size for {{FsCheckpointStateOutputStream}} is 
only 4KB. This would case a lot of IOPS if stream is large. Unfortunately, when 
user want to checkpoint on a totally disaggregated file system which has no 
data node manager running in local machine, they might have a IOPS limit or 
cannot serve too many IOPS at a time. This would cause the checkpoint duration 
really large and might expire often. 
If we want to increase this buffer size, we have to increase the 
{{fileStateThreshold}} to indirectly increase the buffer size. However, as we 
all know, too many not-so-small {{ByteStreamStateHandle}} returned to 
checkpoint coordinator would easily cause job manager OOM and checkpoint meta 
file large.

We should also make the buffer size configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11868) [filesystems] Introduce listStatusIterator API to file system

2019-03-09 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-11868:
-
Description: 
>From existed experience, we know {{listStatus}} is expensive for many 
>distributed file systems especially when the folder contains too many files. 
>This method would not only block the thread until result is return but also 
>could cause OOM due to the returned array of {{FileStatus}} is really large. I 
>think we should already learn it from FLINK-7266 and FLINK-8540.

However, list file status under a path is really helpful in many situations. 
Thankfully, many distributed file system noticed that and provide API such as 
{{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}}
 to call the file system on demand.

 

We should also introduce this API and replace current implementation which used 
previous {{listStatus}}.

  was:
>From existed experience, we know {{listStatus}} is expensive for many 
>distributed file systems especially when the folder contains too many files. 
>This method would not only block the thread until result is return but also 
>could cause OOM due to the returned array of {{FileStatus}} is really large. I 
>think we should learn it from FLINK-7266 and FLINK-8540.

However, list file status under a path is really helpful in many situations. 
Thankfully, many distributed file system noticed that and provide API such as 
{{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}}
 to call the file system on demand.

 

We should also introduce this API and replace current implementation which used 
previous {{listStatus}}.


> [filesystems] Introduce listStatusIterator API to file system
> -
>
> Key: FLINK-11868
> URL: https://issues.apache.org/jira/browse/FLINK-11868
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystems
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.9.0
>
>
> From existed experience, we know {{listStatus}} is expensive for many 
> distributed file systems especially when the folder contains too many files. 
> This method would not only block the thread until result is return but also 
> could cause OOM due to the returned array of {{FileStatus}} is really large. 
> I think we should already learn it from FLINK-7266 and FLINK-8540.
> However, list file status under a path is really helpful in many situations. 
> Thankfully, many distributed file system noticed that and provide API such as 
> {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}}
>  to call the file system on demand.
>  
> We should also introduce this API and replace current implementation which 
> used previous {{listStatus}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11868) [filesystems] Introduce listStatusIterator API to file system

2019-03-09 Thread Yun Tang (JIRA)
Yun Tang created FLINK-11868:


 Summary: [filesystems] Introduce listStatusIterator API to file 
system
 Key: FLINK-11868
 URL: https://issues.apache.org/jira/browse/FLINK-11868
 Project: Flink
  Issue Type: Improvement
  Components: FileSystems
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.9.0


>From existed experience, we know {{listStatus}} is expensive for many 
>distributed file systems especially when the folder contains too many files. 
>This method would not only block the thread until result is return but also 
>could cause OOM due to the returned array of {{FileStatus}} is really large. I 
>think we should learn it from FLINK-7266 and FLINK-8540.

However, list file status under a path is really helpful in many situations. 
Thankfully, many distributed file system noticed that and provide API such as 
{{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}}
 to call the file system on demand.

 

We should also introduce this API and replace current implementation which used 
previous {{listStatus}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Myasuka commented on a change in pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value

2019-03-09 Thread GitBox
Myasuka commented on a change in pull request #7947: [FLINK-11867][Build 
System]Mistaking in checking filePath's value
URL: https://github.com/apache/flink/pull/7947#discussion_r264007594
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ##
 @@ -958,8 +959,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
 * @return The data stream that represents the data read from the given 
file as text lines
 */
public DataStreamSource readTextFile(String filePath, String 
charsetName) {
-   Preconditions.checkNotNull(filePath, "The file path must not be 
null.");
-   Preconditions.checkNotNull(filePath.isEmpty(), "The file path 
must not be empty.");
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The 
file path must not be blank.");
 
 Review comment:
   How about change the error message to `The file path must not be null or 
blank.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value

2019-03-09 Thread GitBox
Myasuka commented on a change in pull request #7947: [FLINK-11867][Build 
System]Mistaking in checking filePath's value
URL: https://github.com/apache/flink/pull/7947#discussion_r264007604
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ##
 @@ -1156,8 +1156,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() {

TypeInformation typeInformation) {
 
Preconditions.checkNotNull(inputFormat, "InputFormat must not 
be null.");
-   Preconditions.checkNotNull(filePath, "The file path must not be 
null.");
-   Preconditions.checkNotNull(filePath.isEmpty(), "The file path 
must not be empty.");
+   
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The 
file path must not be blank.");
 
 Review comment:
   The same, how about change the error message to `The file path must not be 
null or blank.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11865) Code generation in TraversableSerializer is prohibitively slow

2019-03-09 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-11865:
---
Component/s: API / Type Serialization System

> Code generation in TraversableSerializer is prohibitively slow
> --
>
> Key: FLINK-11865
> URL: https://issues.apache.org/jira/browse/FLINK-11865
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.8.0
>
>
> As discussed in FLINK-11539, the new code generation makes job 
> submissions/translation prohibitively slow.
> The solution should be to introduce a Cache for the generated code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value

2019-03-09 Thread GitBox
flinkbot commented on issue #7947: [FLINK-11867][Build System]Mistaking in 
checking filePath's value
URL: https://github.com/apache/flink/pull/7947#issuecomment-471196640
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11867) mistaking in checking filePath's value

2019-03-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11867:
---
Labels: pull-request-available  (was: )

> mistaking in checking filePath's value
> --
>
> Key: FLINK-11867
> URL: https://issues.apache.org/jira/browse/FLINK-11867
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Tom Goong
>Assignee: Tom Goong
>Priority: Major
>  Labels: pull-request-available
>
> In class StreamExecutionEnvironment
>  
> {code:java}
> // code placeholder
> public DataStreamSource readTextFile(String filePath, String 
> charsetName) {
> Preconditions.checkNotNull(filePath, "The file path must not be null.");
> Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be 
> empty.");
> TextInputFormat format = new TextInputFormat(new Path(filePath));
> format.setFilesFilter(FilePathFilter.createDefaultFilter());
> TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
> format.setCharsetName(charsetName);
> return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, 
> typeInfo);
> }
> {code}
>  
> the *Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not 
> be empty.");* this will not work



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Tom-Goong opened a new pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value

2019-03-09 Thread GitBox
Tom-Goong opened a new pull request #7947: [FLINK-11867][Build System]Mistaking 
in checking filePath's value
URL: https://github.com/apache/flink/pull/7947
 
 
   ## What is the purpose of the change
   `Preconditions.checkNotNull(filePath, "The file path must not be 
null.");
Preconditions.checkNotNull(filePath.isEmpty(), "The file path 
must not be empty.");`
   The second line of code has no effect
   
   ## Brief change log
   * Changing the to check the filePath's value and the hint
   
   ## Does this pull request potentially affect one of the following parts:
   * Dependencies (does it add or upgrade a dependency): **no**
   * The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
   * The serializers: **no**
   * The runtime per-record code paths (performance sensitive): **no**
   * Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**, restore state when 
failover using RestartPipelinedRegionStrategy.
   * The S3 file system connector: **no**
   
   ## Documentation
   * Does this pull request introduce a new feature? **no**


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11867) mistaking in checking filePath's value

2019-03-09 Thread Tom Goong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Goong updated FLINK-11867:
--
Description: 
In class StreamExecutionEnvironment

 
{code:java}
// code placeholder
public DataStreamSource readTextFile(String filePath, String 
charsetName) {
Preconditions.checkNotNull(filePath, "The file path must not be null.");
Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be 
empty.");

TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);

return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, 
typeInfo);
}
{code}
 

the *Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be 
empty.");* this will not work

  was:
In class StreamExecutionEnvironment

 
{code:java}
// code placeholder
public DataStreamSource readTextFile(String filePath, String 
charsetName) {
Preconditions.checkNotNull(filePath, "The file path must not be null.");
Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be 
empty.");

TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);

return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, 
typeInfo);
}
{code}
 

the `Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be 
empty.");` this will not work


> mistaking in checking filePath's value
> --
>
> Key: FLINK-11867
> URL: https://issues.apache.org/jira/browse/FLINK-11867
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Tom Goong
>Assignee: Tom Goong
>Priority: Major
>
> In class StreamExecutionEnvironment
>  
> {code:java}
> // code placeholder
> public DataStreamSource readTextFile(String filePath, String 
> charsetName) {
> Preconditions.checkNotNull(filePath, "The file path must not be null.");
> Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be 
> empty.");
> TextInputFormat format = new TextInputFormat(new Path(filePath));
> format.setFilesFilter(FilePathFilter.createDefaultFilter());
> TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
> format.setCharsetName(charsetName);
> return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, 
> typeInfo);
> }
> {code}
>  
> the *Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not 
> be empty.");* this will not work



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11866) [py] Python API Data Types section documentation outdated

2019-03-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11866:
---
Labels: pull-request-available  (was: )

> [py] Python API Data Types section documentation outdated
> -
>
> Key: FLINK-11866
> URL: https://issues.apache.org/jira/browse/FLINK-11866
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Documentation
>Affects Versions: 1.7.2
>Reporter: romano vacca
>Priority: Minor
>  Labels: pull-request-available
>
> The documentation about the data types for the Batch Python API is outdated. 
>  
> {code:java}
> env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
> {code}
> {{should be :}}
> {code:java}
> env.register_type(MyObj, MySerializer(), MyDeserializer()){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] romanovacca opened a new pull request #7946: [FLINK-11866] documentation still shows old function

2019-03-09 Thread GitBox
romanovacca opened a new pull request #7946: [FLINK-11866] documentation still 
shows old function
URL: https://github.com/apache/flink/pull/7946
 
 
   ## What is the purpose of the change
   
   *Improving the documentation*
   
   
   ## Brief change log
 - *changed the outdated example function **register_custom_type** to 
**register_type***
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7946: [FLINK-11866] documentation still shows old function

2019-03-09 Thread GitBox
flinkbot commented on issue #7946: [FLINK-11866] documentation still shows old 
function
URL: https://github.com/apache/flink/pull/7946#issuecomment-471196060
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11867) mistaking in checking filePath's value

2019-03-09 Thread Tom Goong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Goong updated FLINK-11867:
--
Description: 
In class StreamExecutionEnvironment

 
{code:java}
// code placeholder
public DataStreamSource readTextFile(String filePath, String 
charsetName) {
Preconditions.checkNotNull(filePath, "The file path must not be null.");
Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be 
empty.");

TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);

return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, 
typeInfo);
}
{code}
 

the `Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be 
empty.");` this will not work

> mistaking in checking filePath's value
> --
>
> Key: FLINK-11867
> URL: https://issues.apache.org/jira/browse/FLINK-11867
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Tom Goong
>Assignee: Tom Goong
>Priority: Major
>
> In class StreamExecutionEnvironment
>  
> {code:java}
> // code placeholder
> public DataStreamSource readTextFile(String filePath, String 
> charsetName) {
> Preconditions.checkNotNull(filePath, "The file path must not be null.");
> Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be 
> empty.");
> TextInputFormat format = new TextInputFormat(new Path(filePath));
> format.setFilesFilter(FilePathFilter.createDefaultFilter());
> TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
> format.setCharsetName(charsetName);
> return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, 
> typeInfo);
> }
> {code}
>  
> the `Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not 
> be empty.");` this will not work



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11866) [py] Python API Data Types section documentation outdated

2019-03-09 Thread romano vacca (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

romano vacca updated FLINK-11866:
-
Summary: [py] Python API Data Types section documentation outdated  (was: 
[py] Pathon API Data Types section documentation outdated)

> [py] Python API Data Types section documentation outdated
> -
>
> Key: FLINK-11866
> URL: https://issues.apache.org/jira/browse/FLINK-11866
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Documentation
>Affects Versions: 1.7.2
>Reporter: romano vacca
>Priority: Minor
>
> The documentation about the data types for the Batch Python API is outdated. 
>  
> {code:java}
> env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
> {code}
> {{should be :}}
> {code:java}
> env.register_type(MyObj, MySerializer(), MyDeserializer()){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11867) mistaking in checking filePath's value

2019-03-09 Thread Tom Goong (JIRA)
Tom Goong created FLINK-11867:
-

 Summary: mistaking in checking filePath's value
 Key: FLINK-11867
 URL: https://issues.apache.org/jira/browse/FLINK-11867
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Tom Goong
Assignee: Tom Goong






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11866) [py] Pathon API Data Types section documentation outdated

2019-03-09 Thread romano vacca (JIRA)
romano vacca created FLINK-11866:


 Summary: [py] Pathon API Data Types section documentation outdated
 Key: FLINK-11866
 URL: https://issues.apache.org/jira/browse/FLINK-11866
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Documentation
Affects Versions: 1.7.2
Reporter: romano vacca


The documentation about the data types for the Batch Python API is outdated. 

 
{code:java}
env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
{code}
{{should be :}}
{code:java}
env.register_type(MyObj, MySerializer(), MyDeserializer()){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263999310
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
 ##
 @@ -140,20 +145,21 @@ public int getNumIds() {
// 

 
/**
-* De-serializes an array of SerializedCheckpointData back into an 
ArrayDeque of element checkpoints.
+* De-serializes an array of SerializedCheckpointData back into
+* a Map of element checkpoints with the checkpointId as key.
 *
 * @param data The data to be deserialized.
 * @param serializer The serializer used to deserialize the data.
 * @param  The type of the elements.
-* @return An ArrayDeque of element checkpoints.
+* @return A Map of element checkpoints.
 *
 * @throws IOException Thrown, if the serialization fails.
 */
-   public static  ArrayDeque>> toDeque(
+   public static  Map> toDeque(
 
 Review comment:
   Yeah good catch, changed it to `toMap()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263999268
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java
 ##
 @@ -172,9 +178,44 @@ public int getNumIds() {
ids.add(serializer.deserialize(deser));
}
 
-   deque.addLast(new Tuple2>(checkpoint.checkpointId, ids));
+   map.put(checkpoint.checkpointId, ids);
}
 
+   return map;
+   }
+
+   /**
+* Combines multiple ArrayDeques with checkpoint data by checkpointId.
+* This could happen when a job rescales to a lower parallelism and 
states are multiple tasks are combined.
+*
+* @param data The data to be combined.
+* @param  The type of the elements.
+* @return An ArrayDeque of combined element checkpoints.
+*/
+   public static  ArrayDeque>> 
combine(List>> data) {
+   Map> accumulator = new HashMap<>();
 
 Review comment:
   Changed it :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263999125
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
+import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them on the next checkpoint.
+ * This ensures every message will get acknowledged at least once.
+ */
+public class PubSubSource extends 
MultipleIdsMessageAcknowledgingSourceBase
+   implements ResultTypeQueryable, 
ParallelSourceFunction, StoppableFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSource.class);
+   protected final DeserializationSchema deserializationSchema;
+   protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+   protected final Credentials credentials;
+   protected final String projectSubscriptionName;
+   protected final int maxMessagesPerPull;
+
+   protected transient boolean deduplicateMessages;
+   protected transient SubscriberStub subscriber;
+   protected transient PullRequest pullRequest;
+   protected transient EventLoopGroup eventLoopGroup;
+
+   protected transient volatile boolean isRunning;
+   protected transient volatile ApiFuture messagesFuture;
+
+   PubSubSource(DeserializationSchema deserializationSchema, 
PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, 
String projectSubscriptionName, int maxMessagesPerPull) {
+   super(String.class);
+   this.deserializationSchema = deserializationSchema;
+   this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+   this.credentials = credentials;
+   this.projectSubscriptionName = projectSubscriptionName;
+   this.maxMessagesPerPull = maxMessagesPerPull;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   super.open(configuration);
+   if (hasNoCheckpointingEnabled(getRuntimeContext())) {
+   throw new IllegalArgumentException("The PubSubSource 
REQUIRES Checkpointing to be enabled and " +
+   "the 

[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263999125
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
+import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them on the next checkpoint.
+ * This ensures every message will get acknowledged at least once.
+ */
+public class PubSubSource extends 
MultipleIdsMessageAcknowledgingSourceBase
+   implements ResultTypeQueryable, 
ParallelSourceFunction, StoppableFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSource.class);
+   protected final DeserializationSchema deserializationSchema;
+   protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+   protected final Credentials credentials;
+   protected final String projectSubscriptionName;
+   protected final int maxMessagesPerPull;
+
+   protected transient boolean deduplicateMessages;
+   protected transient SubscriberStub subscriber;
+   protected transient PullRequest pullRequest;
+   protected transient EventLoopGroup eventLoopGroup;
+
+   protected transient volatile boolean isRunning;
+   protected transient volatile ApiFuture messagesFuture;
+
+   PubSubSource(DeserializationSchema deserializationSchema, 
PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, 
String projectSubscriptionName, int maxMessagesPerPull) {
+   super(String.class);
+   this.deserializationSchema = deserializationSchema;
+   this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+   this.credentials = credentials;
+   this.projectSubscriptionName = projectSubscriptionName;
+   this.maxMessagesPerPull = maxMessagesPerPull;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   super.open(configuration);
+   if (hasNoCheckpointingEnabled(getRuntimeContext())) {
+   throw new IllegalArgumentException("The PubSubSource 
REQUIRES Checkpointing to be enabled and " +
+   "the 

[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263999050
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,142 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-gcp-pubsub{{ site.scala_version_suffix 
}}
+  {{ site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html)
+for information about how to package the program with the libraries for
+cluster execution.
+
+## Consuming or Producing PubSubMessages
+
+The connector provides a connectors for receiving and sending messages from 
and to Google PubSub.
+Google PubSub has an `Atleast-Once` guarantee and as such the connector 
delivers the same guarantees.
+
+### PubSub SourceFunction
 
 Review comment:
   Yeah makes sense! I've added it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263998745
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction implements 
CheckpointedFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSink.class);
+
+   private final Credentials credentials;
+   private final SerializationSchema serializationSchema;
+   private final String projectName;
+   private final String topicName;
+   private final String hostAndPortForEmulator;
+
+   private transient Publisher publisher;
+
+   private PubSubSink(
+   Credentials credentials,
+   SerializationSchema serializationSchema,
+   String projectName,
+   String topicName,
+   String hostAndPortForEmulator) {
+   this.credentials = credentials;
+   this.serializationSchema = serializationSchema;
+   this.projectName = projectName;
+   this.topicName = topicName;
+   this.hostAndPortForEmulator = hostAndPortForEmulator;
+   }
+
+   private transient ManagedChannel managedChannel = null;
+   private transient TransportChannel channel = null;
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   Publisher.Builder builder = Publisher
+   .newBuilder(ProjectTopicName.of(projectName, topicName))
+   
.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+
+   if (hostAndPortForEmulator != null) {
+   managedChannel = ManagedChannelBuilder
+   .forTarget(hostAndPortForEmulator)
+   .usePlaintext(true) // This is 'Ok' because 
this is ONLY used for testing.
+   .build();
+   channel = 
GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
+   
builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
+   
.setCredentialsProvider(NoCredentialsProvider.create());
+   }
+
+   publisher = builder.build();
+   }
+
+   @Override
+   public void close() throws Exception {
+   super.close();
+   shutdownPublisher();
+   shutdownTransportCh

[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer

2019-03-09 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788666#comment-16788666
 ] 

Aljoscha Krettek commented on FLINK-11539:
--

I created FLINK-11865

> Add TypeSerializerSnapshot for TraversableSerializer
> 
>
> Key: FLINK-11539
> URL: https://issues.apache.org/jira/browse/FLINK-11539
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This will replace the deprecated {{TypeSerializerConfigSnapshot}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11865) Code generation in TraversableSerializer is prohibitively slow

2019-03-09 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-11865:


 Summary: Code generation in TraversableSerializer is prohibitively 
slow
 Key: FLINK-11865
 URL: https://issues.apache.org/jira/browse/FLINK-11865
 Project: Flink
  Issue Type: Bug
Reporter: Aljoscha Krettek
 Fix For: 1.8.0


As discussed in FLINK-11539, the new code generation makes job 
submissions/translation prohibitively slow.

The solution should be to introduce a Cache for the generated code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263998224
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
+import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them on the next checkpoint.
+ * This ensures every message will get acknowledged at least once.
+ */
+public class PubSubSource extends 
MultipleIdsMessageAcknowledgingSourceBase
+   implements ResultTypeQueryable, 
ParallelSourceFunction, StoppableFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSource.class);
+   protected final DeserializationSchema deserializationSchema;
+   protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+   protected final Credentials credentials;
+   protected final String projectSubscriptionName;
+   protected final int maxMessagesPerPull;
+
+   protected transient boolean deduplicateMessages;
+   protected transient SubscriberStub subscriber;
+   protected transient PullRequest pullRequest;
+   protected transient EventLoopGroup eventLoopGroup;
+
+   protected transient volatile boolean isRunning;
+   protected transient volatile ApiFuture messagesFuture;
+
+   PubSubSource(DeserializationSchema deserializationSchema, 
PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, 
String projectSubscriptionName, int maxMessagesPerPull) {
+   super(String.class);
+   this.deserializationSchema = deserializationSchema;
+   this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+   this.credentials = credentials;
+   this.projectSubscriptionName = projectSubscriptionName;
+   this.maxMessagesPerPull = maxMessagesPerPull;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   super.open(configuration);
+   if (hasNoCheckpointingEnabled(getRuntimeContext())) {
+   throw new IllegalArgumentException("The PubSubSource 
REQUIRES Checkpointing to be enabled and " +
+   "the 

[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263998235
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction implements 
CheckpointedFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSink.class);
+
+   private final Credentials credentials;
+   private final SerializationSchema serializationSchema;
+   private final String projectName;
+   private final String topicName;
+   private final String hostAndPortForEmulator;
+
+   private transient Publisher publisher;
+
+   private PubSubSink(
+   Credentials credentials,
+   SerializationSchema serializationSchema,
+   String projectName,
+   String topicName,
+   String hostAndPortForEmulator) {
+   this.credentials = credentials;
+   this.serializationSchema = serializationSchema;
+   this.projectName = projectName;
+   this.topicName = topicName;
+   this.hostAndPortForEmulator = hostAndPortForEmulator;
+   }
+
+   private transient ManagedChannel managedChannel = null;
+   private transient TransportChannel channel = null;
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   Publisher.Builder builder = Publisher
+   .newBuilder(ProjectTopicName.of(projectName, topicName))
+   
.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+
+   if (hostAndPortForEmulator != null) {
+   managedChannel = ManagedChannelBuilder
+   .forTarget(hostAndPortForEmulator)
+   .usePlaintext(true) // This is 'Ok' because 
this is ONLY used for testing.
+   .build();
+   channel = 
GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
+   
builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
+   
.setCredentialsProvider(NoCredentialsProvider.create());
+   }
+
+   publisher = builder.build();
+   }
+
+   @Override
+   public void close() throws Exception {
 
 Review comment:
   See my comment in the source

---

[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263998190
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
+import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them on the next checkpoint.
+ * This ensures every message will get acknowledged at least once.
+ */
+public class PubSubSource extends 
MultipleIdsMessageAcknowledgingSourceBase
+   implements ResultTypeQueryable, 
ParallelSourceFunction, StoppableFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSource.class);
+   protected final DeserializationSchema deserializationSchema;
+   protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+   protected final Credentials credentials;
+   protected final String projectSubscriptionName;
+   protected final int maxMessagesPerPull;
+
+   protected transient boolean deduplicateMessages;
+   protected transient SubscriberStub subscriber;
+   protected transient PullRequest pullRequest;
+   protected transient EventLoopGroup eventLoopGroup;
+
+   protected transient volatile boolean isRunning;
+   protected transient volatile ApiFuture messagesFuture;
+
+   PubSubSource(DeserializationSchema deserializationSchema, 
PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, 
String projectSubscriptionName, int maxMessagesPerPull) {
+   super(String.class);
+   this.deserializationSchema = deserializationSchema;
+   this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+   this.credentials = credentials;
+   this.projectSubscriptionName = projectSubscriptionName;
+   this.maxMessagesPerPull = maxMessagesPerPull;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   super.open(configuration);
+   if (hasNoCheckpointingEnabled(getRuntimeContext())) {
+   throw new IllegalArgumentException("The PubSubSource 
REQUIRES Checkpointing to be enabled and " +
+   "the 

[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-09 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263997975
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
+import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them on the next checkpoint.
+ * This ensures every message will get acknowledged at least once.
+ */
+public class PubSubSource extends 
MultipleIdsMessageAcknowledgingSourceBase
+   implements ResultTypeQueryable, 
ParallelSourceFunction, StoppableFunction {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSource.class);
+   protected final DeserializationSchema deserializationSchema;
+   protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+   protected final Credentials credentials;
+   protected final String projectSubscriptionName;
+   protected final int maxMessagesPerPull;
+
+   protected transient boolean deduplicateMessages;
+   protected transient SubscriberStub subscriber;
+   protected transient PullRequest pullRequest;
+   protected transient EventLoopGroup eventLoopGroup;
+
+   protected transient volatile boolean isRunning;
+   protected transient volatile ApiFuture messagesFuture;
+
+   PubSubSource(DeserializationSchema deserializationSchema, 
PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, 
String projectSubscriptionName, int maxMessagesPerPull) {
+   super(String.class);
+   this.deserializationSchema = deserializationSchema;
+   this.pubSubSubscriberFactory = pubSubSubscriberFactory;
+   this.credentials = credentials;
+   this.projectSubscriptionName = projectSubscriptionName;
+   this.maxMessagesPerPull = maxMessagesPerPull;
+   }
+
+   @Override
+   public void open(Configuration configuration) throws Exception {
+   super.open(configuration);
+   if (hasNoCheckpointingEnabled(getRuntimeContext())) {
+   throw new IllegalArgumentException("The PubSubSource 
REQUIRES Checkpointing to be enabled and " +
+   "the 

[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer

2019-03-09 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788620#comment-16788620
 ] 

Aljoscha Krettek commented on FLINK-11539:
--

The solution [~igalshilman] and I were thinking of was to employ a cache in the 
{{compileCbf}} method here: 
https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala.
 The cache would likely have to be keyed by the code itself (which is a string) 
and the current Thread or context ClassLoader (because that is used in the 
method for compilation).

> Add TypeSerializerSnapshot for TraversableSerializer
> 
>
> Key: FLINK-11539
> URL: https://issues.apache.org/jira/browse/FLINK-11539
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This will replace the deprecated {{TypeSerializerConfigSnapshot}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer

2019-03-09 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788622#comment-16788622
 ] 

Jürgen Kreileder commented on FLINK-11539:
--

That probably would help a lot as it's the same few types over and over again.

> Add TypeSerializerSnapshot for TraversableSerializer
> 
>
> Key: FLINK-11539
> URL: https://issues.apache.org/jira/browse/FLINK-11539
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This will replace the deprecated {{TypeSerializerConfigSnapshot}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer

2019-03-09 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788616#comment-16788616
 ] 

Aljoscha Krettek commented on FLINK-11539:
--

[~jkreileder] we thought that it could have a slow-down but weren't expecting 
this much. Does your job have a lot of data types that have a 
{{TraversableSerializer}} in them?

> Add TypeSerializerSnapshot for TraversableSerializer
> 
>
> Key: FLINK-11539
> URL: https://issues.apache.org/jira/browse/FLINK-11539
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This will replace the deprecated {{TypeSerializerConfigSnapshot}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime

2019-03-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11857:
---
Labels: pull-request-available  (was: )

> Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table 
> runtime
> --
>
> Key: FLINK-11857
> URL: https://issues.apache.org/jira/browse/FLINK-11857
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Operators
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
>
> We need a sorter to take full advantage of the high performance of the Binary 
> format.
> For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to 
> sort BinaryRows.
> For Hash Aggregation:We store the data in HashMap in KeyValue format. When 
> memory is not enough, we spill all the data in memory onto disk and 
> degenerate it into Sort Aggregation. So we need a BufferedKVExternalSorter to 
> write the data that already in memory to disk, and then carry out Sort Merge.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #7945: [FLINK-11857][table-runtime-blink] Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime

2019-03-09 Thread GitBox
flinkbot commented on issue #7945: [FLINK-11857][table-runtime-blink] Introduce 
BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime
URL: https://github.com/apache/flink/pull/7945#issuecomment-471163207
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi opened a new pull request #7945: [FLINK-11857][table-runtime-blink] Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime

2019-03-09 Thread GitBox
JingsongLi opened a new pull request #7945: [FLINK-11857][table-runtime-blink] 
Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table 
runtime
URL: https://github.com/apache/flink/pull/7945
 
 
   ## What is the purpose of the change
   
   We need a sorter to take full advantage of the high performance of the 
Binary format.
   For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to 
sort BinaryRows.
   For Hash Aggregation:We store the data in HashMap in KeyValue format. When 
memory is not enough, we spill all the data in memory onto disk and degenerate 
it into Sort Aggregation. So we need a BufferedKVExternalSorter to write the 
data that already in memory to disk, and then carry out Sort Merge.
   
   ## Verifying this change
   
   ut & coverage
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (BinaryRowSerializer and BaseRowSerializer)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime

2019-03-09 Thread Jingsong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-11857:
-
Summary: Introduce BinaryExternalSorter and BufferedKVExternalSorter to 
batch table runtime  (was: Introduce BinaryExternalSorter to batch table 
runtime)

> Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table 
> runtime
> --
>
> Key: FLINK-11857
> URL: https://issues.apache.org/jira/browse/FLINK-11857
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Operators
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>
> We need a sorter to take full advantage of the high performance of the Binary 
> format.
> For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to 
> sort BinaryRows.
> For Hash Aggregation:We store the data in HashMap in KeyValue format. When 
> memory is not enough, we spill all the data in memory onto disk and 
> degenerate it into Sort Aggregation. So we need a BufferedKVExternalSorter to 
> write the data that already in memory to disk, and then carry out Sort Merge.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter to batch table runtime

2019-03-09 Thread Jingsong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-11857:
-
Description: 
We need a sorter to take full advantage of the high performance of the Binary 
format.

For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to 
sort BinaryRows.

For Hash Aggregation:We store the data in HashMap in KeyValue format. When 
memory is not enough, we spill all the data in memory onto disk and degenerate 
it into Sort Aggregation. So we need a BufferedKVExternalSorter to write the 
data that already in memory to disk, and then carry out Sort Merge.

  was:We need a sorter to take full advantage of the high performance of the 
Binary format.


> Introduce BinaryExternalSorter to batch table runtime
> -
>
> Key: FLINK-11857
> URL: https://issues.apache.org/jira/browse/FLINK-11857
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Operators
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>
> We need a sorter to take full advantage of the high performance of the Binary 
> format.
> For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to 
> sort BinaryRows.
> For Hash Aggregation:We store the data in HashMap in KeyValue format. When 
> memory is not enough, we spill all the data in memory onto disk and 
> degenerate it into Sort Aggregation. So we need a BufferedKVExternalSorter to 
> write the data that already in memory to disk, and then carry out Sort Merge.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-09 Thread GitBox
KurtYoung commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263992514
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.table.runtime.compression.BlockDecompressor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Compressed block channel reader provides a scenario where MemorySegment 
must be maintained.
+ */
+public class CompressedBlockChannelReader
+   implements BlockChannelReader, 
RequestDoneCallback, BufferRecycler {
+
+   private final LinkedBlockingQueue blockQueue;
+   private final boolean copyCompress;
+   private final BlockDecompressor decompressor;
+   private final BufferFileReader reader;
+   private final AtomicReference cause;
+   private final LinkedBlockingQueue retBuffers = new 
LinkedBlockingQueue<>();
+
+   private byte[] buf;
+   private ByteBuffer bufWrapper;
+   private int offset;
+   private int len;
+
+   public CompressedBlockChannelReader(
+   IOManager ioManager,
+   ID channel,
+   LinkedBlockingQueue blockQueue,
+   BlockCompressionFactory codecFactory,
+   int preferBlockSize,
+   int segmentSize) throws IOException {
+   this.reader = ioManager.createBufferFileReader(channel, this);
+   this.blockQueue = blockQueue;
+   copyCompress = preferBlockSize > segmentSize * 2;
+   int blockSize = copyCompress ? preferBlockSize : segmentSize;
+   this.decompressor = codecFactory.getDecompressor();
+   cause = new AtomicReference<>();
+
+   if (copyCompress) {
+   this.buf = new byte[blockSize];
+   this.bufWrapper = ByteBuffer.wrap(buf);
+   }
+
+   BlockCompressor compressor = codecFactory.getCompressor();
+   for (int i = 0; i < 2; i++) {
+   MemorySegment segment = MemorySegmentFactory.wrap(new 
byte[compressor.getMaxCompressedSize(blockSize)]);
+   reader.readInto(new NetworkBuffer(segment, this));
+   }
+   }
+
+   @Override
+   public void readBlock(MemorySegment segment) throws IOException {
+   if (cause.get() != null) {
+   throw cause.get();
+   }
+
+   if (copyCompress) {
+   int readOffset = 0;
+   int readLen = segment.size();
+
+   while (readLen > 0) {
+   int copy = Math.min(readLen, len - offset);
+   if (copy == 0) {
+   readBuffer();
+   } else {
+   segment.put(readOffset, buf, offset, 
copy);
+   

[jira] [Updated] (FLINK-11856) Introduce BinaryHashTable to batch table runtime

2019-03-09 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-11856:
---
Summary: Introduce BinaryHashTable to batch table runtime  (was: Introduce 
BinaryHashTable and LongHashTable to batch table runtime)

> Introduce BinaryHashTable to batch table runtime
> 
>
> Key: FLINK-11856
> URL: https://issues.apache.org/jira/browse/FLINK-11856
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Operators
>Reporter: Kurt Young
>Assignee: Kurt Young
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)