[jira] [Commented] (FLINK-11824) Event-time attribute cannot have same name as in original format
[ https://issues.apache.org/jira/browse/FLINK-11824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788860#comment-16788860 ] Hequn Cheng commented on FLINK-11824: - +1. Would be better to be able to have the same name as the original field. It is also aligned with the behavior of {{StreamTableSource}} with {{DefinedRowtimeAttributes}} > Event-time attribute cannot have same name as in original format > > > Key: FLINK-11824 > URL: https://issues.apache.org/jira/browse/FLINK-11824 > Project: Flink > Issue Type: Bug > Components: API / Table SQL >Affects Versions: 1.7.2, 1.8.0 >Reporter: Fabian Hueske >Priority: Minor > > When a table is defined, event-time attributes are typically defined by > linking them to an existing field in the original format (e.g., CSV, Avro, > JSON, ...). However, right now, the event-time attribute in the defined table > cannot have the same name as the original field. > The following table definition fails with an exception: > {code} > // set up execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > val tEnv = StreamTableEnvironment.create(env) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val names: Array[String] = Array("name", "t") > val types: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG) > tEnv.connect(new Kafka() > .version("universal") > .topic("namesTopic") > .property("zookeeper.connect", "localhost:2181") > .property("bootstrap.servers", "localhost:9092") > .property("group.id", "testGroup")) > .withFormat(new Csv() > .schema(Types.ROW(names, types))) > .withSchema(new Schema() > .field("name", Types.STRING) > .field("t", Types.SQL_TIMESTAMP) // changing "t" to "t2" works > .rowtime(new Rowtime() > .timestampsFromField("t") > .watermarksPeriodicAscending())) > .inAppendMode() > .registerTableSource("Names") > {code} > {code} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field 't' could not be resolved by the field mapping. > at > org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) > at > org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521) > at > org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127) > at > org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33) > at > org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150) > at > org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541) > at > org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11870) Code refactor for some details
shiwuliang created FLINK-11870: -- Summary: Code refactor for some details Key: FLINK-11870 URL: https://issues.apache.org/jira/browse/FLINK-11870 Project: Flink Issue Type: Improvement Reporter: shiwuliang Recently, when I read the source code, I found that there are some points that can be optimized in the source code, and it still seems to have not been completely solved, for example: * Use computeIfAbsent * extra calls I think I can try to optimize them. This is just a very small problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #7948: Code refactor for some details.
flinkbot commented on issue #7948: Code refactor for some details. URL: https://github.com/apache/flink/pull/7948#issuecomment-471254709 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carryxyh opened a new pull request #7948: Code refactor for some details.
carryxyh opened a new pull request #7948: Code refactor for some details. URL: https://github.com/apache/flink/pull/7948 ## What is the purpose of the change Code refactor for some details ## Brief change log * use `computeIfAbsent` to simple code. * remove duplicate call for `jobGraph.getJobID()` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Tom-Goong commented on a change in pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value
Tom-Goong commented on a change in pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value URL: https://github.com/apache/flink/pull/7947#discussion_r264022486 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ## @@ -958,8 +959,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() { * @return The data stream that represents the data read from the given file as text lines */ public DataStreamSource readTextFile(String filePath, String charsetName) { - Preconditions.checkNotNull(filePath, "The file path must not be null."); - Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be blank."); Review comment: > Good catch and thanks for your contribution. Only two minor comments left, please consider to modify the error message. ok, have modified. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11869) [checkpoint] Make buffer size in checkpoint stream factory configurable
Yun Tang created FLINK-11869: Summary: [checkpoint] Make buffer size in checkpoint stream factory configurable Key: FLINK-11869 URL: https://issues.apache.org/jira/browse/FLINK-11869 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.9.0 Currently, the default buffer size for {{FsCheckpointStateOutputStream}} is only 4KB. This would case a lot of IOPS if stream is large. Unfortunately, when user want to checkpoint on a totally disaggregated file system which has no data node manager running in local machine, they might have a IOPS limit or cannot serve too many IOPS at a time. This would cause the checkpoint duration really large and might expire often. If we want to increase this buffer size, we have to increase the {{fileStateThreshold}} to indirectly increase the buffer size. However, as we all know, too many not-so-small {{ByteStreamStateHandle}} returned to checkpoint coordinator would easily cause job manager OOM and checkpoint meta file large. We should also make the buffer size configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11868) [filesystems] Introduce listStatusIterator API to file system
[ https://issues.apache.org/jira/browse/FLINK-11868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-11868: - Description: >From existed experience, we know {{listStatus}} is expensive for many >distributed file systems especially when the folder contains too many files. >This method would not only block the thread until result is return but also >could cause OOM due to the returned array of {{FileStatus}} is really large. I >think we should already learn it from FLINK-7266 and FLINK-8540. However, list file status under a path is really helpful in many situations. Thankfully, many distributed file system noticed that and provide API such as {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}} to call the file system on demand. We should also introduce this API and replace current implementation which used previous {{listStatus}}. was: >From existed experience, we know {{listStatus}} is expensive for many >distributed file systems especially when the folder contains too many files. >This method would not only block the thread until result is return but also >could cause OOM due to the returned array of {{FileStatus}} is really large. I >think we should learn it from FLINK-7266 and FLINK-8540. However, list file status under a path is really helpful in many situations. Thankfully, many distributed file system noticed that and provide API such as {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}} to call the file system on demand. We should also introduce this API and replace current implementation which used previous {{listStatus}}. > [filesystems] Introduce listStatusIterator API to file system > - > > Key: FLINK-11868 > URL: https://issues.apache.org/jira/browse/FLINK-11868 > Project: Flink > Issue Type: Improvement > Components: FileSystems >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.9.0 > > > From existed experience, we know {{listStatus}} is expensive for many > distributed file systems especially when the folder contains too many files. > This method would not only block the thread until result is return but also > could cause OOM due to the returned array of {{FileStatus}} is really large. > I think we should already learn it from FLINK-7266 and FLINK-8540. > However, list file status under a path is really helpful in many situations. > Thankfully, many distributed file system noticed that and provide API such as > {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}} > to call the file system on demand. > > We should also introduce this API and replace current implementation which > used previous {{listStatus}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11868) [filesystems] Introduce listStatusIterator API to file system
Yun Tang created FLINK-11868: Summary: [filesystems] Introduce listStatusIterator API to file system Key: FLINK-11868 URL: https://issues.apache.org/jira/browse/FLINK-11868 Project: Flink Issue Type: Improvement Components: FileSystems Reporter: Yun Tang Assignee: Yun Tang Fix For: 1.9.0 >From existed experience, we know {{listStatus}} is expensive for many >distributed file systems especially when the folder contains too many files. >This method would not only block the thread until result is return but also >could cause OOM due to the returned array of {{FileStatus}} is really large. I >think we should learn it from FLINK-7266 and FLINK-8540. However, list file status under a path is really helpful in many situations. Thankfully, many distributed file system noticed that and provide API such as {{[listStatusIterator|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#listStatusIterator(org.apache.hadoop.fs.Path)]}} to call the file system on demand. We should also introduce this API and replace current implementation which used previous {{listStatus}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka commented on a change in pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value
Myasuka commented on a change in pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value URL: https://github.com/apache/flink/pull/7947#discussion_r264007594 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ## @@ -958,8 +959,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() { * @return The data stream that represents the data read from the given file as text lines */ public DataStreamSource readTextFile(String filePath, String charsetName) { - Preconditions.checkNotNull(filePath, "The file path must not be null."); - Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be blank."); Review comment: How about change the error message to `The file path must not be null or blank.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Myasuka commented on a change in pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value
Myasuka commented on a change in pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value URL: https://github.com/apache/flink/pull/7947#discussion_r264007604 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ## @@ -1156,8 +1156,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() { TypeInformation typeInformation) { Preconditions.checkNotNull(inputFormat, "InputFormat must not be null."); - Preconditions.checkNotNull(filePath, "The file path must not be null."); - Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be blank."); Review comment: The same, how about change the error message to `The file path must not be null or blank.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11865) Code generation in TraversableSerializer is prohibitively slow
[ https://issues.apache.org/jira/browse/FLINK-11865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-11865: --- Component/s: API / Type Serialization System > Code generation in TraversableSerializer is prohibitively slow > -- > > Key: FLINK-11865 > URL: https://issues.apache.org/jira/browse/FLINK-11865 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.8.0 > > > As discussed in FLINK-11539, the new code generation makes job > submissions/translation prohibitively slow. > The solution should be to introduce a Cache for the generated code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value
flinkbot commented on issue #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value URL: https://github.com/apache/flink/pull/7947#issuecomment-471196640 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11867) mistaking in checking filePath's value
[ https://issues.apache.org/jira/browse/FLINK-11867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11867: --- Labels: pull-request-available (was: ) > mistaking in checking filePath's value > -- > > Key: FLINK-11867 > URL: https://issues.apache.org/jira/browse/FLINK-11867 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Tom Goong >Assignee: Tom Goong >Priority: Major > Labels: pull-request-available > > In class StreamExecutionEnvironment > > {code:java} > // code placeholder > public DataStreamSource readTextFile(String filePath, String > charsetName) { > Preconditions.checkNotNull(filePath, "The file path must not be null."); > Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be > empty."); > TextInputFormat format = new TextInputFormat(new Path(filePath)); > format.setFilesFilter(FilePathFilter.createDefaultFilter()); > TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; > format.setCharsetName(charsetName); > return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, > typeInfo); > } > {code} > > the *Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not > be empty.");* this will not work -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Tom-Goong opened a new pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value
Tom-Goong opened a new pull request #7947: [FLINK-11867][Build System]Mistaking in checking filePath's value URL: https://github.com/apache/flink/pull/7947 ## What is the purpose of the change `Preconditions.checkNotNull(filePath, "The file path must not be null."); Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");` The second line of code has no effect ## Brief change log * Changing the to check the filePath's value and the hint ## Does this pull request potentially affect one of the following parts: * Dependencies (does it add or upgrade a dependency): **no** * The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** * The serializers: **no** * The runtime per-record code paths (performance sensitive): **no** * Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**, restore state when failover using RestartPipelinedRegionStrategy. * The S3 file system connector: **no** ## Documentation * Does this pull request introduce a new feature? **no** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11867) mistaking in checking filePath's value
[ https://issues.apache.org/jira/browse/FLINK-11867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Goong updated FLINK-11867: -- Description: In class StreamExecutionEnvironment {code:java} // code placeholder public DataStreamSource readTextFile(String filePath, String charsetName) { Preconditions.checkNotNull(filePath, "The file path must not be null."); Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); TextInputFormat format = new TextInputFormat(new Path(filePath)); format.setFilesFilter(FilePathFilter.createDefaultFilter()); TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName(charsetName); return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); } {code} the *Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");* this will not work was: In class StreamExecutionEnvironment {code:java} // code placeholder public DataStreamSource readTextFile(String filePath, String charsetName) { Preconditions.checkNotNull(filePath, "The file path must not be null."); Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); TextInputFormat format = new TextInputFormat(new Path(filePath)); format.setFilesFilter(FilePathFilter.createDefaultFilter()); TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName(charsetName); return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); } {code} the `Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");` this will not work > mistaking in checking filePath's value > -- > > Key: FLINK-11867 > URL: https://issues.apache.org/jira/browse/FLINK-11867 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Tom Goong >Assignee: Tom Goong >Priority: Major > > In class StreamExecutionEnvironment > > {code:java} > // code placeholder > public DataStreamSource readTextFile(String filePath, String > charsetName) { > Preconditions.checkNotNull(filePath, "The file path must not be null."); > Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be > empty."); > TextInputFormat format = new TextInputFormat(new Path(filePath)); > format.setFilesFilter(FilePathFilter.createDefaultFilter()); > TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; > format.setCharsetName(charsetName); > return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, > typeInfo); > } > {code} > > the *Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not > be empty.");* this will not work -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11866) [py] Python API Data Types section documentation outdated
[ https://issues.apache.org/jira/browse/FLINK-11866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11866: --- Labels: pull-request-available (was: ) > [py] Python API Data Types section documentation outdated > - > > Key: FLINK-11866 > URL: https://issues.apache.org/jira/browse/FLINK-11866 > Project: Flink > Issue Type: Bug > Components: API / Python, Documentation >Affects Versions: 1.7.2 >Reporter: romano vacca >Priority: Minor > Labels: pull-request-available > > The documentation about the data types for the Batch Python API is outdated. > > {code:java} > env.register_custom_type(MyObj, MySerializer(), MyDeserializer()) > {code} > {{should be :}} > {code:java} > env.register_type(MyObj, MySerializer(), MyDeserializer()){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] romanovacca opened a new pull request #7946: [FLINK-11866] documentation still shows old function
romanovacca opened a new pull request #7946: [FLINK-11866] documentation still shows old function URL: https://github.com/apache/flink/pull/7946 ## What is the purpose of the change *Improving the documentation* ## Brief change log - *changed the outdated example function **register_custom_type** to **register_type*** ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7946: [FLINK-11866] documentation still shows old function
flinkbot commented on issue #7946: [FLINK-11866] documentation still shows old function URL: https://github.com/apache/flink/pull/7946#issuecomment-471196060 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11867) mistaking in checking filePath's value
[ https://issues.apache.org/jira/browse/FLINK-11867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Goong updated FLINK-11867: -- Description: In class StreamExecutionEnvironment {code:java} // code placeholder public DataStreamSource readTextFile(String filePath, String charsetName) { Preconditions.checkNotNull(filePath, "The file path must not be null."); Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty."); TextInputFormat format = new TextInputFormat(new Path(filePath)); format.setFilesFilter(FilePathFilter.createDefaultFilter()); TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName(charsetName); return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); } {code} the `Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be empty.");` this will not work > mistaking in checking filePath's value > -- > > Key: FLINK-11867 > URL: https://issues.apache.org/jira/browse/FLINK-11867 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Tom Goong >Assignee: Tom Goong >Priority: Major > > In class StreamExecutionEnvironment > > {code:java} > // code placeholder > public DataStreamSource readTextFile(String filePath, String > charsetName) { > Preconditions.checkNotNull(filePath, "The file path must not be null."); > Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not be > empty."); > TextInputFormat format = new TextInputFormat(new Path(filePath)); > format.setFilesFilter(FilePathFilter.createDefaultFilter()); > TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; > format.setCharsetName(charsetName); > return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, > typeInfo); > } > {code} > > the `Preconditions.checkNotNull(filePath.isEmpty(), "The file path must not > be empty.");` this will not work -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11866) [py] Python API Data Types section documentation outdated
[ https://issues.apache.org/jira/browse/FLINK-11866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] romano vacca updated FLINK-11866: - Summary: [py] Python API Data Types section documentation outdated (was: [py] Pathon API Data Types section documentation outdated) > [py] Python API Data Types section documentation outdated > - > > Key: FLINK-11866 > URL: https://issues.apache.org/jira/browse/FLINK-11866 > Project: Flink > Issue Type: Bug > Components: API / Python, Documentation >Affects Versions: 1.7.2 >Reporter: romano vacca >Priority: Minor > > The documentation about the data types for the Batch Python API is outdated. > > {code:java} > env.register_custom_type(MyObj, MySerializer(), MyDeserializer()) > {code} > {{should be :}} > {code:java} > env.register_type(MyObj, MySerializer(), MyDeserializer()){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11867) mistaking in checking filePath's value
Tom Goong created FLINK-11867: - Summary: mistaking in checking filePath's value Key: FLINK-11867 URL: https://issues.apache.org/jira/browse/FLINK-11867 Project: Flink Issue Type: Bug Components: Build System Reporter: Tom Goong Assignee: Tom Goong -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11866) [py] Pathon API Data Types section documentation outdated
romano vacca created FLINK-11866: Summary: [py] Pathon API Data Types section documentation outdated Key: FLINK-11866 URL: https://issues.apache.org/jira/browse/FLINK-11866 Project: Flink Issue Type: Bug Components: API / Python, Documentation Affects Versions: 1.7.2 Reporter: romano vacca The documentation about the data types for the Batch Python API is outdated. {code:java} env.register_custom_type(MyObj, MySerializer(), MyDeserializer()) {code} {{should be :}} {code:java} env.register_type(MyObj, MySerializer(), MyDeserializer()){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263999310 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java ## @@ -140,20 +145,21 @@ public int getNumIds() { // /** -* De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints. +* De-serializes an array of SerializedCheckpointData back into +* a Map of element checkpoints with the checkpointId as key. * * @param data The data to be deserialized. * @param serializer The serializer used to deserialize the data. * @param The type of the elements. -* @return An ArrayDeque of element checkpoints. +* @return A Map of element checkpoints. * * @throws IOException Thrown, if the serialization fails. */ - public static ArrayDeque>> toDeque( + public static Map> toDeque( Review comment: Yeah good catch, changed it to `toMap()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263999268 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SerializedCheckpointData.java ## @@ -172,9 +178,44 @@ public int getNumIds() { ids.add(serializer.deserialize(deser)); } - deque.addLast(new Tuple2>(checkpoint.checkpointId, ids)); + map.put(checkpoint.checkpointId, ids); } + return map; + } + + /** +* Combines multiple ArrayDeques with checkpoint data by checkpointId. +* This could happen when a job rescales to a lower parallelism and states are multiple tasks are combined. +* +* @param data The data to be combined. +* @param The type of the elements. +* @return An ArrayDeque of combined element checkpoints. +*/ + public static ArrayDeque>> combine(List>> data) { + Map> accumulator = new HashMap<>(); Review comment: Changed it :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263999125 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them on the next checkpoint. + * This ensures every message will get acknowledged at least once. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase + implements ResultTypeQueryable, ParallelSourceFunction, StoppableFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected final DeserializationSchema deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + protected final Credentials credentials; + protected final String projectSubscriptionName; + protected final int maxMessagesPerPull; + + protected transient boolean deduplicateMessages; + protected transient SubscriberStub subscriber; + protected transient PullRequest pullRequest; + protected transient EventLoopGroup eventLoopGroup; + + protected transient volatile boolean isRunning; + protected transient volatile ApiFuture messagesFuture; + + PubSubSource(DeserializationSchema deserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, String projectSubscriptionName, int maxMessagesPerPull) { + super(String.class); + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.projectSubscriptionName = projectSubscriptionName; + this.maxMessagesPerPull = maxMessagesPerPull; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + if (hasNoCheckpointingEnabled(getRuntimeContext())) { + throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " + + "the
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263999125 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them on the next checkpoint. + * This ensures every message will get acknowledged at least once. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase + implements ResultTypeQueryable, ParallelSourceFunction, StoppableFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected final DeserializationSchema deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + protected final Credentials credentials; + protected final String projectSubscriptionName; + protected final int maxMessagesPerPull; + + protected transient boolean deduplicateMessages; + protected transient SubscriberStub subscriber; + protected transient PullRequest pullRequest; + protected transient EventLoopGroup eventLoopGroup; + + protected transient volatile boolean isRunning; + protected transient volatile ApiFuture messagesFuture; + + PubSubSource(DeserializationSchema deserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, String projectSubscriptionName, int maxMessagesPerPull) { + super(String.class); + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.projectSubscriptionName = projectSubscriptionName; + this.maxMessagesPerPull = maxMessagesPerPull; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + if (hasNoCheckpointingEnabled(getRuntimeContext())) { + throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " + + "the
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263999050 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,142 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-gcp-pubsub{{ site.scala_version_suffix }} + {{ site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html) +for information about how to package the program with the libraries for +cluster execution. + +## Consuming or Producing PubSubMessages + +The connector provides a connectors for receiving and sending messages from and to Google PubSub. +Google PubSub has an `Atleast-Once` guarantee and as such the connector delivers the same guarantees. + +### PubSub SourceFunction Review comment: Yeah makes sense! I've added it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263998745 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class); + + private final Credentials credentials; + private final SerializationSchema serializationSchema; + private final String projectName; + private final String topicName; + private final String hostAndPortForEmulator; + + private transient Publisher publisher; + + private PubSubSink( + Credentials credentials, + SerializationSchema serializationSchema, + String projectName, + String topicName, + String hostAndPortForEmulator) { + this.credentials = credentials; + this.serializationSchema = serializationSchema; + this.projectName = projectName; + this.topicName = topicName; + this.hostAndPortForEmulator = hostAndPortForEmulator; + } + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; + + @Override + public void open(Configuration configuration) throws Exception { + Publisher.Builder builder = Publisher + .newBuilder(ProjectTopicName.of(projectName, topicName)) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)); + + if (hostAndPortForEmulator != null) { + managedChannel = ManagedChannelBuilder + .forTarget(hostAndPortForEmulator) + .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing. + .build(); + channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build(); + builder.setChannelProvider(FixedTransportChannelProvider.create(channel)) + .setCredentialsProvider(NoCredentialsProvider.create()); + } + + publisher = builder.build(); + } + + @Override + public void close() throws Exception { + super.close(); + shutdownPublisher(); + shutdownTransportCh
[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer
[ https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788666#comment-16788666 ] Aljoscha Krettek commented on FLINK-11539: -- I created FLINK-11865 > Add TypeSerializerSnapshot for TraversableSerializer > > > Key: FLINK-11539 > URL: https://issues.apache.org/jira/browse/FLINK-11539 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This will replace the deprecated {{TypeSerializerConfigSnapshot}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11865) Code generation in TraversableSerializer is prohibitively slow
Aljoscha Krettek created FLINK-11865: Summary: Code generation in TraversableSerializer is prohibitively slow Key: FLINK-11865 URL: https://issues.apache.org/jira/browse/FLINK-11865 Project: Flink Issue Type: Bug Reporter: Aljoscha Krettek Fix For: 1.8.0 As discussed in FLINK-11539, the new code generation makes job submissions/translation prohibitively slow. The solution should be to introduce a Cache for the generated code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263998224 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them on the next checkpoint. + * This ensures every message will get acknowledged at least once. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase + implements ResultTypeQueryable, ParallelSourceFunction, StoppableFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected final DeserializationSchema deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + protected final Credentials credentials; + protected final String projectSubscriptionName; + protected final int maxMessagesPerPull; + + protected transient boolean deduplicateMessages; + protected transient SubscriberStub subscriber; + protected transient PullRequest pullRequest; + protected transient EventLoopGroup eventLoopGroup; + + protected transient volatile boolean isRunning; + protected transient volatile ApiFuture messagesFuture; + + PubSubSource(DeserializationSchema deserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, String projectSubscriptionName, int maxMessagesPerPull) { + super(String.class); + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.projectSubscriptionName = projectSubscriptionName; + this.maxMessagesPerPull = maxMessagesPerPull; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + if (hasNoCheckpointingEnabled(getRuntimeContext())) { + throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " + + "the
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263998235 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java ## @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; + +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction implements CheckpointedFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class); + + private final Credentials credentials; + private final SerializationSchema serializationSchema; + private final String projectName; + private final String topicName; + private final String hostAndPortForEmulator; + + private transient Publisher publisher; + + private PubSubSink( + Credentials credentials, + SerializationSchema serializationSchema, + String projectName, + String topicName, + String hostAndPortForEmulator) { + this.credentials = credentials; + this.serializationSchema = serializationSchema; + this.projectName = projectName; + this.topicName = topicName; + this.hostAndPortForEmulator = hostAndPortForEmulator; + } + + private transient ManagedChannel managedChannel = null; + private transient TransportChannel channel = null; + + @Override + public void open(Configuration configuration) throws Exception { + Publisher.Builder builder = Publisher + .newBuilder(ProjectTopicName.of(projectName, topicName)) + .setCredentialsProvider(FixedCredentialsProvider.create(credentials)); + + if (hostAndPortForEmulator != null) { + managedChannel = ManagedChannelBuilder + .forTarget(hostAndPortForEmulator) + .usePlaintext(true) // This is 'Ok' because this is ONLY used for testing. + .build(); + channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build(); + builder.setChannelProvider(FixedTransportChannelProvider.create(channel)) + .setCredentialsProvider(NoCredentialsProvider.create()); + } + + publisher = builder.build(); + } + + @Override + public void close() throws Exception { Review comment: See my comment in the source ---
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263998190 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them on the next checkpoint. + * This ensures every message will get acknowledged at least once. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase + implements ResultTypeQueryable, ParallelSourceFunction, StoppableFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected final DeserializationSchema deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + protected final Credentials credentials; + protected final String projectSubscriptionName; + protected final int maxMessagesPerPull; + + protected transient boolean deduplicateMessages; + protected transient SubscriberStub subscriber; + protected transient PullRequest pullRequest; + protected transient EventLoopGroup eventLoopGroup; + + protected transient volatile boolean isRunning; + protected transient volatile ApiFuture messagesFuture; + + PubSubSource(DeserializationSchema deserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, String projectSubscriptionName, int maxMessagesPerPull) { + super(String.class); + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.projectSubscriptionName = projectSubscriptionName; + this.maxMessagesPerPull = maxMessagesPerPull; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + if (hasNoCheckpointingEnabled(getRuntimeContext())) { + throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " + + "the
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263997975 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them on the next checkpoint. + * This ensures every message will get acknowledged at least once. + */ +public class PubSubSource extends MultipleIdsMessageAcknowledgingSourceBase + implements ResultTypeQueryable, ParallelSourceFunction, StoppableFunction { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected final DeserializationSchema deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + protected final Credentials credentials; + protected final String projectSubscriptionName; + protected final int maxMessagesPerPull; + + protected transient boolean deduplicateMessages; + protected transient SubscriberStub subscriber; + protected transient PullRequest pullRequest; + protected transient EventLoopGroup eventLoopGroup; + + protected transient volatile boolean isRunning; + protected transient volatile ApiFuture messagesFuture; + + PubSubSource(DeserializationSchema deserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, String projectSubscriptionName, int maxMessagesPerPull) { + super(String.class); + this.deserializationSchema = deserializationSchema; + this.pubSubSubscriberFactory = pubSubSubscriberFactory; + this.credentials = credentials; + this.projectSubscriptionName = projectSubscriptionName; + this.maxMessagesPerPull = maxMessagesPerPull; + } + + @Override + public void open(Configuration configuration) throws Exception { + super.open(configuration); + if (hasNoCheckpointingEnabled(getRuntimeContext())) { + throw new IllegalArgumentException("The PubSubSource REQUIRES Checkpointing to be enabled and " + + "the
[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer
[ https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788620#comment-16788620 ] Aljoscha Krettek commented on FLINK-11539: -- The solution [~igalshilman] and I were thinking of was to employ a cache in the {{compileCbf}} method here: https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala. The cache would likely have to be keyed by the code itself (which is a string) and the current Thread or context ClassLoader (because that is used in the method for compilation). > Add TypeSerializerSnapshot for TraversableSerializer > > > Key: FLINK-11539 > URL: https://issues.apache.org/jira/browse/FLINK-11539 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This will replace the deprecated {{TypeSerializerConfigSnapshot}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer
[ https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788622#comment-16788622 ] Jürgen Kreileder commented on FLINK-11539: -- That probably would help a lot as it's the same few types over and over again. > Add TypeSerializerSnapshot for TraversableSerializer > > > Key: FLINK-11539 > URL: https://issues.apache.org/jira/browse/FLINK-11539 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This will replace the deprecated {{TypeSerializerConfigSnapshot}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer
[ https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16788616#comment-16788616 ] Aljoscha Krettek commented on FLINK-11539: -- [~jkreileder] we thought that it could have a slow-down but weren't expecting this much. Does your job have a lot of data types that have a {{TraversableSerializer}} in them? > Add TypeSerializerSnapshot for TraversableSerializer > > > Key: FLINK-11539 > URL: https://issues.apache.org/jira/browse/FLINK-11539 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This will replace the deprecated {{TypeSerializerConfigSnapshot}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime
[ https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11857: --- Labels: pull-request-available (was: ) > Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table > runtime > -- > > Key: FLINK-11857 > URL: https://issues.apache.org/jira/browse/FLINK-11857 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > > We need a sorter to take full advantage of the high performance of the Binary > format. > For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to > sort BinaryRows. > For Hash Aggregation:We store the data in HashMap in KeyValue format. When > memory is not enough, we spill all the data in memory onto disk and > degenerate it into Sort Aggregation. So we need a BufferedKVExternalSorter to > write the data that already in memory to disk, and then carry out Sort Merge. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #7945: [FLINK-11857][table-runtime-blink] Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime
flinkbot commented on issue #7945: [FLINK-11857][table-runtime-blink] Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime URL: https://github.com/apache/flink/pull/7945#issuecomment-471163207 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi opened a new pull request #7945: [FLINK-11857][table-runtime-blink] Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime
JingsongLi opened a new pull request #7945: [FLINK-11857][table-runtime-blink] Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime URL: https://github.com/apache/flink/pull/7945 ## What is the purpose of the change We need a sorter to take full advantage of the high performance of the Binary format. For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to sort BinaryRows. For Hash Aggregation:We store the data in HashMap in KeyValue format. When memory is not enough, we spill all the data in memory onto disk and degenerate it into Sort Aggregation. So we need a BufferedKVExternalSorter to write the data that already in memory to disk, and then carry out Sort Merge. ## Verifying this change ut & coverage ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (BinaryRowSerializer and BaseRowSerializer) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime
[ https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-11857: - Summary: Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table runtime (was: Introduce BinaryExternalSorter to batch table runtime) > Introduce BinaryExternalSorter and BufferedKVExternalSorter to batch table > runtime > -- > > Key: FLINK-11857 > URL: https://issues.apache.org/jira/browse/FLINK-11857 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > > We need a sorter to take full advantage of the high performance of the Binary > format. > For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to > sort BinaryRows. > For Hash Aggregation:We store the data in HashMap in KeyValue format. When > memory is not enough, we spill all the data in memory onto disk and > degenerate it into Sort Aggregation. So we need a BufferedKVExternalSorter to > write the data that already in memory to disk, and then carry out Sort Merge. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter to batch table runtime
[ https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-11857: - Description: We need a sorter to take full advantage of the high performance of the Binary format. For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to sort BinaryRows. For Hash Aggregation:We store the data in HashMap in KeyValue format. When memory is not enough, we spill all the data in memory onto disk and degenerate it into Sort Aggregation. So we need a BufferedKVExternalSorter to write the data that already in memory to disk, and then carry out Sort Merge. was:We need a sorter to take full advantage of the high performance of the Binary format. > Introduce BinaryExternalSorter to batch table runtime > - > > Key: FLINK-11857 > URL: https://issues.apache.org/jira/browse/FLINK-11857 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > > We need a sorter to take full advantage of the high performance of the Binary > format. > For Sort, Sort Aggregation, SortMergeJoin: we need a BinaryExternalSorter to > sort BinaryRows. > For Hash Aggregation:We store the data in HashMap in KeyValue format. When > memory is not enough, we spill all the data in memory onto disk and > degenerate it into Sort Aggregation. So we need a BufferedKVExternalSorter to > write the data that already in memory to disk, and then carry out Sort Merge. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
KurtYoung commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263992514 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.table.runtime.compression.BlockDecompressor; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Compressed block channel reader provides a scenario where MemorySegment must be maintained. + */ +public class CompressedBlockChannelReader + implements BlockChannelReader, RequestDoneCallback, BufferRecycler { + + private final LinkedBlockingQueue blockQueue; + private final boolean copyCompress; + private final BlockDecompressor decompressor; + private final BufferFileReader reader; + private final AtomicReference cause; + private final LinkedBlockingQueue retBuffers = new LinkedBlockingQueue<>(); + + private byte[] buf; + private ByteBuffer bufWrapper; + private int offset; + private int len; + + public CompressedBlockChannelReader( + IOManager ioManager, + ID channel, + LinkedBlockingQueue blockQueue, + BlockCompressionFactory codecFactory, + int preferBlockSize, + int segmentSize) throws IOException { + this.reader = ioManager.createBufferFileReader(channel, this); + this.blockQueue = blockQueue; + copyCompress = preferBlockSize > segmentSize * 2; + int blockSize = copyCompress ? preferBlockSize : segmentSize; + this.decompressor = codecFactory.getDecompressor(); + cause = new AtomicReference<>(); + + if (copyCompress) { + this.buf = new byte[blockSize]; + this.bufWrapper = ByteBuffer.wrap(buf); + } + + BlockCompressor compressor = codecFactory.getCompressor(); + for (int i = 0; i < 2; i++) { + MemorySegment segment = MemorySegmentFactory.wrap(new byte[compressor.getMaxCompressedSize(blockSize)]); + reader.readInto(new NetworkBuffer(segment, this)); + } + } + + @Override + public void readBlock(MemorySegment segment) throws IOException { + if (cause.get() != null) { + throw cause.get(); + } + + if (copyCompress) { + int readOffset = 0; + int readLen = segment.size(); + + while (readLen > 0) { + int copy = Math.min(readLen, len - offset); + if (copy == 0) { + readBuffer(); + } else { + segment.put(readOffset, buf, offset, copy); +
[jira] [Updated] (FLINK-11856) Introduce BinaryHashTable to batch table runtime
[ https://issues.apache.org/jira/browse/FLINK-11856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11856: --- Summary: Introduce BinaryHashTable to batch table runtime (was: Introduce BinaryHashTable and LongHashTable to batch table runtime) > Introduce BinaryHashTable to batch table runtime > > > Key: FLINK-11856 > URL: https://issues.apache.org/jira/browse/FLINK-11856 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: Kurt Young >Assignee: Kurt Young >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)