[jira] [Commented] (FLINK-7439) Support variable arguments for UDTF in SQL
[ https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125307#comment-16125307 ] ASF GitHub Bot commented on FLINK-7439: --- GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/4536 [FLINK-7439] [table] Support variable arguments for UDTF in SQL ## What is the purpose of the change Support variable arguments for UDTF in SQL ## Brief change log - `TableSqlFunction`'s operand type inference and checker - modify `ScalarFunctionCallGen` and `TableFunctionCallGen` which handles var args incorrectly. ## Verifying this change This change added tests and can be verified as follows: - *Added `CorrelateTest.scala` for batch and stream queries to verify logical plan* The integration test can be covered by existing `CorrelateITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? na You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink udtf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4536.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4536 commit b18ce206ad69e0abf72f5388538bb65e4911c65a Author: Jark Wu Date: 2017-08-14T06:18:52Z [FLINK-7439] [table] Support variable arguments for UDTF in SQL > Support variable arguments for UDTF in SQL > -- > > Key: FLINK-7439 > URL: https://issues.apache.org/jira/browse/FLINK-7439 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently, both UDF and UDAF support variable parameters, but UDTF not. > FLINK-5882 supports variable UDTF for Table API only, but missed SQL. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4536: [FLINK-7439] [table] Support variable arguments fo...
GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/4536 [FLINK-7439] [table] Support variable arguments for UDTF in SQL ## What is the purpose of the change Support variable arguments for UDTF in SQL ## Brief change log - `TableSqlFunction`'s operand type inference and checker - modify `ScalarFunctionCallGen` and `TableFunctionCallGen` which handles var args incorrectly. ## Verifying this change This change added tests and can be verified as follows: - *Added `CorrelateTest.scala` for batch and stream queries to verify logical plan* The integration test can be covered by existing `CorrelateITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? na You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink udtf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4536.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4536 commit b18ce206ad69e0abf72f5388538bb65e4911c65a Author: Jark Wu Date: 2017-08-14T06:18:52Z [FLINK-7439] [table] Support variable arguments for UDTF in SQL --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6180) Remove TestingSerialRpcService
[ https://issues.apache.org/jira/browse/FLINK-6180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125301#comment-16125301 ] ASF GitHub Bot commented on FLINK-6180: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4516 Merging this PR. > Remove TestingSerialRpcService > -- > > Key: FLINK-6180 > URL: https://issues.apache.org/jira/browse/FLINK-6180 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > The {{TestingSerialRpcService}} is problematic because it allows execution > interleavings which are not possible when using the {{AkkaRpcService}}, > because main thread calls can be executed while another main thread call is > still being executed. Therefore, we might test things which are not possible > and might not test certain interleavings which occur when using the > {{AkkaRpcService}}. > Therefore, I propose to remove the {{TestingSerialRpcService}} and to > refactor the existing tests to use the {{AkkaRpcService}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4516: [FLINK-6180] [rpc] Remove TestingSerialRpcService
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4516 Merging this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4473: [FLINK-7367][kinesis connector] Parameterize more ...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r132879596 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -57,6 +55,9 @@ /** Properties to parametrize settings such as AWS service region, access key etc. */ private final Properties configProps; + /** Configuration for KinesisProducer. */ + private final KinesisProducerConfiguration producerConfig; --- End diff -- agree. I'm moving it to `open()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125289#comment-16125289 ] ASF GitHub Bot commented on FLINK-7367: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r132879596 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -57,6 +55,9 @@ /** Properties to parametrize settings such as AWS service region, access key etc. */ private final Properties configProps; + /** Configuration for KinesisProducer. */ + private final KinesisProducerConfiguration producerConfig; --- End diff -- agree. I'm moving it to `open()` > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125286#comment-16125286 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 @bowenli86 I made some comments on the serializability of the `KinesisProducerConfiguration`. That isn't serializable. Perhaps that is what is causing the failure for you. I'm not really sure why it isn't failing for 1.3.0, though .. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 @bowenli86 I made some comments on the serializability of the `KinesisProducerConfiguration`. That isn't serializable. Perhaps that is what is causing the failure for you. I'm not really sure why it isn't failing for 1.3.0, though .. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4473: [FLINK-7367][kinesis connector] Parameterize more ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r132879159 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -57,6 +55,9 @@ /** Properties to parametrize settings such as AWS service region, access key etc. */ private final Properties configProps; + /** Configuration for KinesisProducer. */ + private final KinesisProducerConfiguration producerConfig; --- End diff -- `KinesisProducerConfiguration` isn't serializable. We need to make it `transient`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125284#comment-16125284 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r132879261 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -57,6 +55,9 @@ /** Properties to parametrize settings such as AWS service region, access key etc. */ private final Properties configProps; + /** Configuration for KinesisProducer. */ + private final KinesisProducerConfiguration producerConfig; --- End diff -- On second thought: do we actually really need to have a field for this? Or can we just instantiate in in the method (it doesn't seem to be used across different methods)? > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4473: [FLINK-7367][kinesis connector] Parameterize more ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r132879261 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -57,6 +55,9 @@ /** Properties to parametrize settings such as AWS service region, access key etc. */ private final Properties configProps; + /** Configuration for KinesisProducer. */ + private final KinesisProducerConfiguration producerConfig; --- End diff -- On second thought: do we actually really need to have a field for this? Or can we just instantiate in in the method (it doesn't seem to be used across different methods)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125283#comment-16125283 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r132879203 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -115,13 +116,13 @@ public String getTargetStream(OUT element) { * This is a constructor supporting {@see KinesisSerializationSchema}. * * @param schema Kinesis serialization schema for the data type -* @param configProps The properties used to configure AWS credentials and AWS region +* @param configProps The properties used to configure KinesisProducer, including AWS credentials and AWS region */ public FlinkKinesisProducer(KinesisSerializationSchema schema, Properties configProps) { - this.configProps = checkNotNull(configProps, "configProps can not be null"); - - // check the configuration properties for any conflicting settings - KinesisConfigUtil.validateProducerConfiguration(this.configProps); + checkNotNull(configProps, "configProps can not be null"); + this.configProps = KinesisConfigUtil.replaceDeprecatedProducerKeys(configProps); + // check the configuration properties for any invalid settings + this.producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps); --- End diff -- For non-serializable fields that needs to be `transient`, we should only initialize them in `open`. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125285#comment-16125285 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r132879159 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -57,6 +55,9 @@ /** Properties to parametrize settings such as AWS service region, access key etc. */ private final Properties configProps; + /** Configuration for KinesisProducer. */ + private final KinesisProducerConfiguration producerConfig; --- End diff -- `KinesisProducerConfiguration` isn't serializable. We need to make it `transient`. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4473: [FLINK-7367][kinesis connector] Parameterize more ...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4473#discussion_r132879203 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -115,13 +116,13 @@ public String getTargetStream(OUT element) { * This is a constructor supporting {@see KinesisSerializationSchema}. * * @param schema Kinesis serialization schema for the data type -* @param configProps The properties used to configure AWS credentials and AWS region +* @param configProps The properties used to configure KinesisProducer, including AWS credentials and AWS region */ public FlinkKinesisProducer(KinesisSerializationSchema schema, Properties configProps) { - this.configProps = checkNotNull(configProps, "configProps can not be null"); - - // check the configuration properties for any conflicting settings - KinesisConfigUtil.validateProducerConfiguration(this.configProps); + checkNotNull(configProps, "configProps can not be null"); + this.configProps = KinesisConfigUtil.replaceDeprecatedProducerKeys(configProps); + // check the configuration properties for any invalid settings + this.producerConfig = KinesisConfigUtil.validateProducerConfiguration(configProps); --- End diff -- For non-serializable fields that needs to be `transient`, we should only initialize them in `open`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125280#comment-16125280 ] ASF GitHub Bot commented on FLINK-7367: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai it's failing on ``` The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183) org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131) ``` FYI, I merged this PR into my 1.3.2 source code. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer
[ https://issues.apache.org/jira/browse/FLINK-7440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-7440: --- Description: For better user experience, we should add eager serializable checks on the provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, with better error messages pointing out exactly that the serialization schema isn't serializable. (was: For better user experience, we should add eager serializable checks on the provided `KinesisDeserializationSchema` / `KinesisSerializationSchema`, with better error messages pointing out exactly that the serialization schema isn't serializable.) > Add eager serializable checks on provided de-/serialization schemas for > Kinesis consumer / producer > --- > > Key: FLINK-7440 > URL: https://issues.apache.org/jira/browse/FLINK-7440 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.4.0, 1.3.3 > > > For better user experience, we should add eager serializable checks on the > provided {{KinesisDeserializationSchema}} / {{KinesisSerializationSchema}}, > with better error messages pointing out exactly that the serialization schema > isn't serializable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7440) Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer
Tzu-Li (Gordon) Tai created FLINK-7440: -- Summary: Add eager serializable checks on provided de-/serialization schemas for Kinesis consumer / producer Key: FLINK-7440 URL: https://issues.apache.org/jira/browse/FLINK-7440 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.4.0, 1.3.3 For better user experience, we should add eager serializable checks on the provided `KinesisDeserializationSchema` / `KinesisSerializationSchema`, with better error messages pointing out exactly that the serialization schema isn't serializable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai it's failing on ``` The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183) org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131) ``` FYI, I merged this PR into my 1.3.2 source code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125272#comment-16125272 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 @bowenli86 what is it failing on? AFAIK, the only serialization changes in 1.3.2 compared to 1.3.0 are serialization of some specific `TypeSerializer`s. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 @bowenli86 what is it failing on? AFAIK, the only serialization changes in 1.3.2 compared to 1.3.0 are serialization of some specific `TypeSerializer`s. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125270#comment-16125270 ] ASF GitHub Bot commented on FLINK-7367: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai Thanks Gordan for the quick response! You are right, I set the schema in unit test to be static and it passes. For my Flink job. When I build it against Flink 1.3.0, it can run well on EMR. When I build it against Flink 1.3.2, it fails to run. Has anything related to serialization changed since Flink 1.3.0? > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 @tzulitai Thanks Gordan for the quick response! You are right, I set the schema in unit test to be static and it passes. For my Flink job. When I build it against Flink 1.3.0, it can run well on EMR. When I build it against Flink 1.3.2, it fails to run. Has anything related to serialization changed since Flink 1.3.0? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125264#comment-16125264 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 @bowenli86 on the other hand, in the constructors of `FlinkKinesisProducer`, we probably should add eager safe checks on the serializability of the provided `KinesisSerializationSchema` and have good error messages in case it isn't serializable. I'll open a separate JIRA / PR for that. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 @bowenli86 on the other hand, in the constructors of `FlinkKinesisProducer`, we probably should add eager safe checks on the serializability of the provided `KinesisSerializationSchema` and have good error messages in case it isn't serializable. I'll open a separate JIRA / PR for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 @bowenli86 I think the test you posted above is failing because the provided `KinesisSerializationSchema` is not serializable. It is an anonymous class, which would contain a reference to the enclosing (which I guess is the test class, hence not serializable.) To make it work you either need to make the test class serializable, or define a static `KinesisSerializationSchema` subclass instead of using an anonymous class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125262#comment-16125262 ] ASF GitHub Bot commented on FLINK-7367: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4473 @bowenli86 I think the test you posted above is failing because the provided `KinesisSerializationSchema` is not serializable. It is an anonymous class, which would contain a reference to the enclosing (which I guess is the test class, hence not serializable.) To make it work you either need to make the test class serializable, or define a static `KinesisSerializationSchema` subclass instead of using an anonymous class. > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7367) Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, MaxConnections, RequestTimeout, etc)
[ https://issues.apache.org/jira/browse/FLINK-7367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125252#comment-16125252 ] ASF GitHub Bot commented on FLINK-7367: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 hmmm... I was deploying our Flink job with this change. The Flink job failed to start, and log reports: ``` The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183) org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131) ``` of which the implementation of the RichSinkFunction is FlinkKinesisProducer. The only field I add to FlinkKinesisProducer is KinesisProducerConfiguration. So I made KinesisProducerConfiguration `transient`, and ran the Flink job, the job still fails. Thus I doubted if FlinkKinesisProducer is already not serializable currently. To verify that, I created a test for the current FlinkKinesisProducer in master which doesn't have my PR change. Unit test is: ``` @Test public void testProducerSerializable() { Properties testConfig = new Properties(); testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); KinesisConfigUtil.validateAwsConfiguration(testConfig); FlinkKinesisProducer producer = new FlinkKinesisProducer(new KinesisSerializationSchema() { @Override public ByteBuffer serialize(Object element) { return null; } @Override public String getTargetStream(Object element) { return null; } }, testConfig); ClosureCleaner.ensureSerializable(producer); } ``` And it fails. Thus, I think something in FlinkKinesisProducer might not serializable, and already breaks FlinkKinesisProducer. @tzulitai @aljoscha Any insights on this? > Parameterize more configs for FlinkKinesisProducer (RecordMaxBufferedTime, > MaxConnections, RequestTimeout, etc) > --- > > Key: FLINK-7367 > URL: https://issues.apache.org/jira/browse/FLINK-7367 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0, 1.3.3 > > > Right now, FlinkKinesisProducer only expose two configs for the underlying > KinesisProducer: > - AGGREGATION_MAX_COUNT > - COLLECTION_MAX_COUNT > Well, according to [AWS > doc|http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html] > and [their sample on > github|https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties], > developers can set more to make the max use of KinesisProducer, and make it > fault-tolerant (e.g. by increasing timeout). > I select a few more configs that we need when using Flink with Kinesis: > - MAX_CONNECTIONS > - RATE_LIMIT > - RECORD_MAX_BUFFERED_TIME > - RECORD_TIME_TO_LIVE > - REQUEST_TIMEOUT > Flink is using KPL's default values. They make Flink writing too fast to > Kinesis, which fail Flink job too frequently. We need to parameterize > FlinkKinesisProducer to pass in the above params, in order to slowing down > Flink's write rate to Kinesis. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4473: [FLINK-7367][kinesis connector] Parameterize more configs...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4473 hmmm... I was deploying our Flink job with this change. The Flink job failed to start, and log reports: ``` The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183) org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1131) ``` of which the implementation of the RichSinkFunction is FlinkKinesisProducer. The only field I add to FlinkKinesisProducer is KinesisProducerConfiguration. So I made KinesisProducerConfiguration `transient`, and ran the Flink job, the job still fails. Thus I doubted if FlinkKinesisProducer is already not serializable currently. To verify that, I created a test for the current FlinkKinesisProducer in master which doesn't have my PR change. Unit test is: ``` @Test public void testProducerSerializable() { Properties testConfig = new Properties(); testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); KinesisConfigUtil.validateAwsConfiguration(testConfig); FlinkKinesisProducer producer = new FlinkKinesisProducer(new KinesisSerializationSchema() { @Override public ByteBuffer serialize(Object element) { return null; } @Override public String getTargetStream(Object element) { return null; } }, testConfig); ClosureCleaner.ensureSerializable(producer); } ``` And it fails. Thus, I think something in FlinkKinesisProducer might not serializable, and already breaks FlinkKinesisProducer. @tzulitai @aljoscha Any insights on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1234) Make Hadoop2 profile default
[ https://issues.apache.org/jira/browse/FLINK-1234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125243#comment-16125243 ] ASF GitHub Bot commented on FLINK-1234: --- GitHub user zhuganghuaonnet opened a pull request: https://github.com/apache/flink/pull/4535 Eventhubs-support read from and write to Azure eventhubs *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-1234] [component] Title of the pull request", where *FLINK-1234* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge
[GitHub] flink pull request #4535: Eventhubs-support read from and write to Azure eve...
GitHub user zhuganghuaonnet opened a pull request: https://github.com/apache/flink/pull/4535 Eventhubs-support read from and write to Azure eventhubs *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-1234] [component] Title of the pull request", where *FLINK-1234* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhuganghuaonnet/flink eventhubs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4535.patch To close thi
[jira] [Updated] (FLINK-7439) Support variable arguments for UDTF in SQL
[ https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-7439: --- Description: Currently, both UDF and UDAF support variable parameters, but UDTF not. FLINK-5882 supports variable UDTF for Table API only, but missed SQL. was:Currently, both UDF and UDAF support variable parameters, but UDTF not. > Support variable arguments for UDTF in SQL > -- > > Key: FLINK-7439 > URL: https://issues.apache.org/jira/browse/FLINK-7439 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently, both UDF and UDAF support variable parameters, but UDTF not. > FLINK-5882 supports variable UDTF for Table API only, but missed SQL. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7439) Support variable arguments for UDTF in SQL
[ https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-7439: --- Summary: Support variable arguments for UDTF in SQL (was: Support variable arguments for UDTF) > Support variable arguments for UDTF in SQL > -- > > Key: FLINK-7439 > URL: https://issues.apache.org/jira/browse/FLINK-7439 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently, both UDF and UDAF support variable parameters, but UDTF not. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132867483 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(Queue> computationStates, + Collection>> matchedResult, AfterMatchSkipStrategy afterMatchSkipStrategy) { + Set discardEvents = new HashSet<>(); + switch(afterMatchSkipStrategy.getStrategy()) { + case SKIP_TO_LAST: + for (Map> resultMap: matchedResult) { + for (Map.Entry> keyMatches : resultMap.entrySet()) { + if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { + discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1)); + break; + } else { + discardEvents.addAll(keyMatches.getValue()); + } + } + } + break; + case SKIP_TO_FIRST: + for (Map> resultMap: matchedResult) { + for (Map.Entry> keyMatches : resultMap.entrySet()) { + if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { + break; + } else { + discardEvents.addAll(keyMatches.getValue()); + } + } + } + break; + case SKIP_PAST_LAST_EVENT: + for (Map> resultMap: matchedResult) { + for (List eventList: resultMap.values()) { + discardEvents.addAll(eventList); + } + } + break; + } + if (!discardEvents.isEmpty()) { + List> discardStates = new ArrayList<>(); + for (ComputationState computationState : computationStates) { + Map> partialMatch = extractCurrentMatches(computationState); + for (List list: partialMatch.values()) { + for (T e: list) { + if (discardEvents.contains(e)) { + // discard the computation state. + eventSharedBuffer.release( + NFAStateNameHandler.getOriginalNameFromInternal( + computationState.getState().getName()), + computationState.getEvent(), + computationState.getTimestamp(), + computationState.getCounter() + ); + discardStates.add(computationState); --- End diff -- Should add **break;** after **discardStates.add(computationState);**, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7169) Support AFTER MATCH SKIP function in CEP library API
[ https://issues.apache.org/jira/browse/FLINK-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125189#comment-16125189 ] ASF GitHub Bot commented on FLINK-7169: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/4331#discussion_r132867483 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -340,6 +362,65 @@ public void resetNFAChanged() { return Tuple2.of(result, timeoutResult); } + private void discardComputationStatesAccordingToStrategy(Queue> computationStates, + Collection>> matchedResult, AfterMatchSkipStrategy afterMatchSkipStrategy) { + Set discardEvents = new HashSet<>(); + switch(afterMatchSkipStrategy.getStrategy()) { + case SKIP_TO_LAST: + for (Map> resultMap: matchedResult) { + for (Map.Entry> keyMatches : resultMap.entrySet()) { + if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { + discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1)); + break; + } else { + discardEvents.addAll(keyMatches.getValue()); + } + } + } + break; + case SKIP_TO_FIRST: + for (Map> resultMap: matchedResult) { + for (Map.Entry> keyMatches : resultMap.entrySet()) { + if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) { + break; + } else { + discardEvents.addAll(keyMatches.getValue()); + } + } + } + break; + case SKIP_PAST_LAST_EVENT: + for (Map> resultMap: matchedResult) { + for (List eventList: resultMap.values()) { + discardEvents.addAll(eventList); + } + } + break; + } + if (!discardEvents.isEmpty()) { + List> discardStates = new ArrayList<>(); + for (ComputationState computationState : computationStates) { + Map> partialMatch = extractCurrentMatches(computationState); + for (List list: partialMatch.values()) { + for (T e: list) { + if (discardEvents.contains(e)) { + // discard the computation state. + eventSharedBuffer.release( + NFAStateNameHandler.getOriginalNameFromInternal( + computationState.getState().getName()), + computationState.getEvent(), + computationState.getTimestamp(), + computationState.getCounter() + ); + discardStates.add(computationState); --- End diff -- Should add **break;** after **discardStates.add(computationState);**, right? > Support AFTER MATCH SKIP function in CEP library API > > > Key: FLINK-7169 > URL: https://issues.apache.org/jira/browse/FLINK-7169 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Yueting Chen >Assignee: Yueting Chen > Fix For: 1.4.0 > > > In order to support Oracle's MATCH_RECOGNIZE on top of the CEP library, we > need to support AFTER MATCH SKIP function in CEP API. > There're four options in AFTER MATCH SKIP, listed as follows: > 1. AFTER M
[jira] [Commented] (FLINK-6233) Support rowtime inner equi-join between two streams in the SQL API
[ https://issues.apache.org/jira/browse/FLINK-6233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125182#comment-16125182 ] Xingcan Cui commented on FLINK-6233: Hi [~fhueske], thanks for the previous work by you and yuhong, the implementation process is going well. Nevertheless, I've got some minor questions/ideas. # Considering that the core logics of the rowtime inner join and proctime inner join are almost the same. Can I extract an abstract {{TimeWindowInnerJoin}} class and let the {{ProcTimeWindowInnerJoin}} and {{RowTimeWindowInnerJoin}} extend it? # The clean up process for the cached data is triggered by ProcessingTimeTimers in the {{ProcTimeWindowInnerJoin}}. For {{RowTimeWindowInnerJoin}}, I think this process could be directly triggered by the watermarks without registering the EventTimeTimer, right? # Since the collections provided by the state backend are simple, it may be inefficient to search for the out-of-dated records. I think the current "short-circuit" codes (as shown below) can not clean all the expired data. {code:java} while (keyIter.hasNext && !validTimestamp) { val recordTime = keyIter.next if (recordTime < expiredTime) { removeList.add(recordTime) } else { // we found a timestamp that is still valid validTimestamp = true } } {code} To cope with that, I plan to split the "cache window" into continuous static-panes, and casting one to expired as a whole. By doing like that, we may store some extra records, whose time interval is equal to the static span of the panes, but can remove the expired data efficiently. # I'd like to introduce an extra {{allowLateness}} parameter (which can be set in the {{StreamQueryConfig}}) to the join function. But for now, I'll give it a default {{0L}} value. > Support rowtime inner equi-join between two streams in the SQL API > -- > > Key: FLINK-6233 > URL: https://issues.apache.org/jira/browse/FLINK-6233 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: hongyuhong >Assignee: Xingcan Cui > > The goal of this issue is to add support for inner equi-join on proc time > streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT o.rowtime , o.productId, o.orderId, s.rowtime AS shipTime > FROM Orders AS o > JOIN Shipments AS s > ON o.orderId = s.orderId > AND o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL '1' HOUR; > {code} > The following restrictions should initially apply: > * The join hint only support inner join > * The ON clause should include equi-join condition > * The time-condition {{o.rowtime BETWEEN s.rowtime AND s.rowtime + INTERVAL > '1' HOUR}} only can use rowtime that is a system attribute, the time > condition only support bounded time range like {{o.rowtime BETWEEN s.rowtime > - INTERVAL '1' HOUR AND s.rowtime + INTERVAL '1' HOUR}}, not support > unbounded like {{o.rowtime < s.rowtime}} , and should include both two > stream's rowtime attribute, {{o.rowtime between rowtime () and rowtime () + > 1}} should also not be supported. > An row-time streams join will not be able to handle late data, because this > would mean in insert a row into a sorted order shift all other computations. > This would be too expensive to maintain. Therefore, we will throw an error if > a user tries to use an row-time stream join with late data handling. > This issue includes: > * Design of the DataStream operator to deal with stream join > * Translation from Calcite's RelNode representation (LogicalJoin). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125145#comment-16125145 ] ASF GitHub Bot commented on FLINK-7378: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4485 @NicoK , I have submitted the updates: - Create the fix size `LocalBufferPool` for floating buffers - Assign the exclusive buffers for `InputChannel` directly - The proposed `BufferPoolListener` will be included in next PR > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4485: [FLINK-7378][core]Create a fix size (non rebalancing) buf...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4485 @NicoK , I have submitted the updates: - Create the fix size `LocalBufferPool` for floating buffers - Assign the exclusive buffers for `InputChannel` directly - The proposed `BufferPoolListener` will be included in next PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125142#comment-16125142 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132863952 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala --- @@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._ import org.apache.calcite.sql.validate.SqlMonotonicity /** - * Function that materializes a time attribute to the metadata timestamp. After materialization - * the result can be used in regular arithmetical calculations. + * Function that materializes a processing time attribute. + * After materialization the result can be used in regular arithmetical calculations. */ -object TimeMaterializationSqlFunction +object ProctimeSqlFunction --- End diff -- Should we move this object to `org.apache.flink.table.functions.sql` package? > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132863952 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ProctimeSqlFunction.scala --- @@ -22,12 +22,12 @@ import org.apache.calcite.sql.`type`._ import org.apache.calcite.sql.validate.SqlMonotonicity /** - * Function that materializes a time attribute to the metadata timestamp. After materialization - * the result can be used in regular arithmetical calculations. + * Function that materializes a processing time attribute. + * After materialization the result can be used in regular arithmetical calculations. */ -object TimeMaterializationSqlFunction +object ProctimeSqlFunction --- End diff -- Should we move this object to `org.apache.flink.table.functions.sql` package? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7439) Support variable arguments for UDTF
[ https://issues.apache.org/jira/browse/FLINK-7439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-7439: --- Component/s: Table API & SQL > Support variable arguments for UDTF > --- > > Key: FLINK-7439 > URL: https://issues.apache.org/jira/browse/FLINK-7439 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently, both UDF and UDAF support variable parameters, but UDTF not. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7439) Support variable arguments for UDTF
Jark Wu created FLINK-7439: -- Summary: Support variable arguments for UDTF Key: FLINK-7439 URL: https://issues.apache.org/jira/browse/FLINK-7439 Project: Flink Issue Type: Improvement Reporter: Jark Wu Assignee: Jark Wu Currently, both UDF and UDAF support variable parameters, but UDTF not. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function
[ https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125091#comment-16125091 ] ASF GitHub Bot commented on FLINK-7358: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/4534 [FLINK-7358][table]Add implicitly converts support for User-defined function In this PR i had Add implicitly converts support for User-defined function. * Update the udfs document. * Add implicitly converts for user-defined scalarFunction and tableFunction. * Add test case for the changes. *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-1234] [component] Title of the pull request", where *FLINK-1234* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation
[GitHub] flink pull request #4534: [FLINK-7358][table]Add implicitly converts support...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/4534 [FLINK-7358][table]Add implicitly converts support for User-defined function In this PR i had Add implicitly converts support for User-defined function. * Update the udfs document. * Add implicitly converts for user-defined scalarFunction and tableFunction. * Add test case for the changes. *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-1234] [component] Title of the pull request", where *FLINK-1234* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? ( docs ) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-7
[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function
[ https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125083#comment-16125083 ] sunjincheng commented on FLINK-7358: [~fhueske] Yes, Agree with you. I had open the PR in calcite. And do some communication and discussion with julian. BTW. welcome you to join in the discussion. :-). > Add implicitly converts support for User-defined function > -- > > Key: FLINK-7358 > URL: https://issues.apache.org/jira/browse/FLINK-7358 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently if user defined a UDF as follows: > {code} > object Func extends ScalarFunction { > def eval(a: Int, b: Long): String = { > ... > } > } > {code} > And if the table schema is (a: Int, b: int, c: String), then we can not call > the UDF `Func('a, 'b)`. So > I want add implicitly converts when we call UDF. The implicitly convert rule > is: > BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> > FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO > *Note: > In this JIRA. only for TableAPI, And SQL will be fixed in > https://issues.apache.org/jira/browse/CALCITE-1908.* > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7358) Add implicitly converts support for User-defined function
[ https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125083#comment-16125083 ] sunjincheng edited comment on FLINK-7358 at 8/13/17 11:30 PM: -- [~fhueske] Yes, Agree with you. I had open the JIRA. in calcite(https://issues.apache.org/jira/browse/CALCITE-1908). And do some communication and discussion with julian. BTW. welcome you to join in the discussion. :-). was (Author: sunjincheng121): [~fhueske] Yes, Agree with you. I had open the PR in calcite(https://issues.apache.org/jira/browse/CALCITE-1908). And do some communication and discussion with julian. BTW. welcome you to join in the discussion. :-). > Add implicitly converts support for User-defined function > -- > > Key: FLINK-7358 > URL: https://issues.apache.org/jira/browse/FLINK-7358 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently if user defined a UDF as follows: > {code} > object Func extends ScalarFunction { > def eval(a: Int, b: Long): String = { > ... > } > } > {code} > And if the table schema is (a: Int, b: int, c: String), then we can not call > the UDF `Func('a, 'b)`. So > I want add implicitly converts when we call UDF. The implicitly convert rule > is: > BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> > FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO > *Note: > In this JIRA. only for TableAPI, And SQL will be fixed in > https://issues.apache.org/jira/browse/CALCITE-1908.* > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7358) Add implicitly converts support for User-defined function
[ https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125083#comment-16125083 ] sunjincheng edited comment on FLINK-7358 at 8/13/17 11:30 PM: -- [~fhueske] Yes, Agree with you. I had open the PR in calcite(https://issues.apache.org/jira/browse/CALCITE-1908). And do some communication and discussion with julian. BTW. welcome you to join in the discussion. :-). was (Author: sunjincheng121): [~fhueske] Yes, Agree with you. I had open the PR in calcite. And do some communication and discussion with julian. BTW. welcome you to join in the discussion. :-). > Add implicitly converts support for User-defined function > -- > > Key: FLINK-7358 > URL: https://issues.apache.org/jira/browse/FLINK-7358 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently if user defined a UDF as follows: > {code} > object Func extends ScalarFunction { > def eval(a: Int, b: Long): String = { > ... > } > } > {code} > And if the table schema is (a: Int, b: int, c: String), then we can not call > the UDF `Func('a, 'b)`. So > I want add implicitly converts when we call UDF. The implicitly convert rule > is: > BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> > FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO > *Note: > In this JIRA. only for TableAPI, And SQL will be fixed in > https://issues.apache.org/jira/browse/CALCITE-1908.* > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7396) Don't put multiple directories in HADOOP_CONF_DIR in config.sh
[ https://issues.apache.org/jira/browse/FLINK-7396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125069#comment-16125069 ] ASF GitHub Bot commented on FLINK-7396: --- Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/4511 > Don't put multiple directories in HADOOP_CONF_DIR in config.sh > -- > > Key: FLINK-7396 > URL: https://issues.apache.org/jira/browse/FLINK-7396 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > In config.sh we do this: > {code} > # Check if deprecated HADOOP_HOME is set. > if [ -n "$HADOOP_HOME" ]; then > # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path > if [ -d "$HADOOP_HOME/conf" ]; then > # its a Hadoop 1.x > HADOOP_CONF_DIR="$HADOOP_CONF_DIR:$HADOOP_HOME/conf" > fi > if [ -d "$HADOOP_HOME/etc/hadoop" ]; then > # Its Hadoop 2.2+ > HADOOP_CONF_DIR="$HADOOP_CONF_DIR:$HADOOP_HOME/etc/hadoop" > fi > fi > {code} > while our {{HadoopFileSystem}} actually only treats this paths as a single > path, not a colon-separated path: > https://github.com/apache/flink/blob/854b05376a459a6197e41e141bb28a9befe481ad/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java#L236 > I also think that other tools don't assume multiple paths in there and at > least one user ran into the problem on their setup. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7396) Don't put multiple directories in HADOOP_CONF_DIR in config.sh
[ https://issues.apache.org/jira/browse/FLINK-7396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125068#comment-16125068 ] ASF GitHub Bot commented on FLINK-7396: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4511 Thank you for merging it > Don't put multiple directories in HADOOP_CONF_DIR in config.sh > -- > > Key: FLINK-7396 > URL: https://issues.apache.org/jira/browse/FLINK-7396 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.4.0, 1.3.2 >Reporter: Aljoscha Krettek >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > In config.sh we do this: > {code} > # Check if deprecated HADOOP_HOME is set. > if [ -n "$HADOOP_HOME" ]; then > # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path > if [ -d "$HADOOP_HOME/conf" ]; then > # its a Hadoop 1.x > HADOOP_CONF_DIR="$HADOOP_CONF_DIR:$HADOOP_HOME/conf" > fi > if [ -d "$HADOOP_HOME/etc/hadoop" ]; then > # Its Hadoop 2.2+ > HADOOP_CONF_DIR="$HADOOP_CONF_DIR:$HADOOP_HOME/etc/hadoop" > fi > fi > {code} > while our {{HadoopFileSystem}} actually only treats this paths as a single > path, not a colon-separated path: > https://github.com/apache/flink/blob/854b05376a459a6197e41e141bb28a9befe481ad/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java#L236 > I also think that other tools don't assume multiple paths in there and at > least one user ran into the problem on their setup. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4511: [FLINK-7396] Don't put multiple directories in HAD...
Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/4511 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4511: [FLINK-7396] Don't put multiple directories in HADOOP_CON...
Github user zjureel commented on the issue: https://github.com/apache/flink/pull/4511 Thank you for merging it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125061#comment-16125061 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132854814 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * Wraps a ProcessFunction and sets a Timestamp field of a CRow as + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. + */ +class OutputRowtimeProcessFunction[OUT]( +function: MapFunction[CRow, OUT], +rowtimeIdx: Int) --- End diff -- in fact the data type is changed to keep the data type unchanged. A time indicator is externally represented and treated as a `TIMESTAMP` field and only internally handled as `LONG`. Therefore, we need to convert it into a `TIMESTAMP` once the result is converted into a `DataStream`. You are right, that we need to convert all time indicators to `TIMESTAMP` and not only one. This is currently enforced by the exception that you observed. Currently users have to cast all but one time indicator attributes to `TIMESTAMP`. That will also convert them from `long` to `Timestamp`. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132854814 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/OutputRowtimeProcessFunction.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.util.Collector + +/** + * Wraps a ProcessFunction and sets a Timestamp field of a CRow as + * [[org.apache.flink.streaming.runtime.streamrecord.StreamRecord]] timestamp. + */ +class OutputRowtimeProcessFunction[OUT]( +function: MapFunction[CRow, OUT], +rowtimeIdx: Int) --- End diff -- in fact the data type is changed to keep the data type unchanged. A time indicator is externally represented and treated as a `TIMESTAMP` field and only internally handled as `LONG`. Therefore, we need to convert it into a `TIMESTAMP` once the result is converted into a `DataStream`. You are right, that we need to convert all time indicators to `TIMESTAMP` and not only one. This is currently enforced by the exception that you observed. Currently users have to cast all but one time indicator attributes to `TIMESTAMP`. That will also convert them from `long` to `Timestamp`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125053#comment-16125053 ] ASF GitHub Bot commented on FLINK-7337: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132854238 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- I think we should avoid implicit defaults like using the timestamp attribute of the left most table (the left most table might not have a time indicator attribute, join order optimization would change the order of tables) and special cases for queries like `SELECT *`. When a `Table` is converted into a `DataStream` it is likely that the resulting stream is further processed by logic that cannot be expressed in SQL / Table API. If a `Table` has multiple timestamp attributes, IMO a user should be forced to make a choice for the `StreamRecord` timestamp, because the semantics of any subsequent time-based operations will depend on that. I see two ways to do that: - ensure that only one attribute is a time indicator by casting the others to `TIMESTAMP` - let the user specify which field should be used as timestamp as an additional parameter of the `toAppendStream` and `toRetractStream` methods. We could also do both. I agree with @wuchong that we do not need this restriction when we emit a `Table` to a `TableSink`. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132854238 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- I think we should avoid implicit defaults like using the timestamp attribute of the left most table (the left most table might not have a time indicator attribute, join order optimization would change the order of tables) and special cases for queries like `SELECT *`. When a `Table` is converted into a `DataStream` it is likely that the resulting stream is further processed by logic that cannot be expressed in SQL / Table API. If a `Table` has multiple timestamp attributes, IMO a user should be forced to make a choice for the `StreamRecord` timestamp, because the semantics of any subsequent time-based operations will depend on that. I see two ways to do that: - ensure that only one attribute is a time indicator by casting the others to `TIMESTAMP` - let the user specify which field should be used as timestamp as an additional parameter of the `toAppendStream` and `toRetractStream` methods. We could also do both. I agree with @wuchong that we do not need this restriction when we emit a `Table` to a `TableSink`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat
[ https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125010#comment-16125010 ] ASF GitHub Bot commented on FLINK-7423: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4525 `MutableObjectIterator#next` allows/requires a `null` result: "@return The object or null if the iterator is exhausted.". The documentation could be improved, e.g. in `MergeIterator` the returned object is not necessarily the immediately passed argument. `DataSourceTask` looks to be the only `InputFormat` consumer re-passing a reuse object. It seems the reason to allow returning `null` is handling, for example, a bad record at the end of a file, such that `reachedEnd` would have returned `false` but there is no record to return. Object reuse is always optional so is not an issue for immutable types. > Always reuse an instance to get elements from the inputFormat > --- > > Key: FLINK-7423 > URL: https://issues.apache.org/jira/browse/FLINK-7423 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xu Pingyong >Assignee: Xu Pingyong > > In InputFormatSourceFunction.java: > {code:java} > OUT nextElement = serializer.createInstance(); > while (isRunning) { > format.open(splitIterator.next()); > // for each element we also check if cancel > // was called by checking the isRunning flag > while (isRunning && !format.reachedEnd()) { > nextElement = > format.nextRecord(nextElement); > if (nextElement != null) { > ctx.collect(nextElement); > } else { > break; > } > } > format.close(); > completedSplitsCounter.inc(); > if (isRunning) { > isRunning = splitIterator.hasNext(); > } > } > {code} > the format may return other element or null when nextRecord, that will may > cause exception. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4525 `MutableObjectIterator#next` allows/requires a `null` result: "@return The object or null if the iterator is exhausted.". The documentation could be improved, e.g. in `MergeIterator` the returned object is not necessarily the immediately passed argument. `DataSourceTask` looks to be the only `InputFormat` consumer re-passing a reuse object. It seems the reason to allow returning `null` is handling, for example, a bad record at the end of a file, such that `reachedEnd` would have returned `false` but there is no record to return. Object reuse is always optional so is not an issue for immutable types. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-7378: Description: Currently the number of network buffers in {{LocalBufferPool}} for {{SingleInputGate}} is limited by {{a * + b}}, where a is the number of exclusive buffers for each channel and b is the number of floating buffers shared by all channels. Considering the credit-based flow control feature, we want to create a fix size buffer pool used to manage the floating buffers for {{SingleInputGate}}. And the exclusive buffers are assigned to {{InputChannel}}s directly. was: Currently the number of network buffers in {{LocalBufferPool}} for {{SingleInputGate}} is limited by {{a * + b}}, where a is the number of exclusive buffers for each channel and b is the number of floating buffers shared by all channels. Considering the credit-based flow control feature, we want to implement a new fixed size buffer pool type used to manage the floating buffers for {{SingleInputGate}}. Compared with {{LocalBufferPool}}, this is a non-rebalancing buffer pool which will not participate in redistributing the left available buffers in {{NetworkBufferPool}}. > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7378) Create a fix size (non rebalancing) buffer pool type for the floating buffers
[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang updated FLINK-7378: Summary: Create a fix size (non rebalancing) buffer pool type for the floating buffers (was: Implement the FixedBufferPool for floating buffers of SingleInputGate) > Create a fix size (non rebalancing) buffer pool type for the floating buffers > - > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to implement a new > fixed size buffer pool type used to manage the floating buffers for > {{SingleInputGate}}. > Compared with {{LocalBufferPool}}, this is a non-rebalancing buffer pool > which will not participate in redistributing the left available buffers in > {{NetworkBufferPool}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132843982 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.environment; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.python.api.datastream.PythonDataStream; +import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction; +import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer; +import org.python.core.PyObject; +import org.python.core.PyString; +import org.python.core.PyInteger; +import org.python.core.PyLong; +import org.python.core.PyUnicode; +import org.python.core.PyTuple; +import org.python.core.PyObjectDerived; +import org.python.core.PyInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; + + +/** + * A thin wrapper layer over {@link StreamExecutionEnvironment}. + * + * The PythonStreamExecutionEnvironment is the context in which a streaming program is executed. + * + * + * The environment provides methods to control the job execution (such as setting the parallelism + * or the fault tolerance/checkpointing parameters) and to interact with the outside world + * (data access). + */ +@PublicEvolving +public class PythonStreamExecutionEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class); + private final StreamExecutionEnvironment env; + + /** +* A thin wrapper layer over {@link StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes +* care for required Jython serializers registration. +* +* @return The python execution environment of the context in which the program is +* executed. +*/ + public static PythonStreamExecutionEnvironment get_execution_environment() { + return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment()); + } + + /** +* Creates a {@link LocalStreamEnvironment}. The local execution environment +* will run the program in a multi-threaded fashion in the same JVM as the +* environment was created in. The default parallelism of the local +* environment is the number of hardware contexts (CPU cores / threads), +* unless it was specified differently by {@link #setParallelism(int)}. +* +* @param configuration +* Pass a custom configuration into the cluster +* @return A local execution environment with the specified parallelism. +*/ + public st
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124952#comment-16124952 ] ASF GitHub Bot commented on FLINK-5886: --- Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132843982 --- Diff: flink-libraries/flink-streaming-python/src/main/java/org/apache/flink/streaming/python/api/environment/PythonStreamExecutionEnvironment.java --- @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api.environment; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.python.api.datastream.PythonDataStream; +import org.apache.flink.streaming.python.api.functions.PythonGeneratorFunction; +import org.apache.flink.streaming.python.api.functions.PythonIteratorFunction; +import org.apache.flink.streaming.python.api.functions.UtilityFunctions; +import org.apache.flink.streaming.python.util.serialization.PyObjectSerializer; +import org.python.core.PyObject; +import org.python.core.PyString; +import org.python.core.PyInteger; +import org.python.core.PyLong; +import org.python.core.PyUnicode; +import org.python.core.PyTuple; +import org.python.core.PyObjectDerived; +import org.python.core.PyInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Iterator; +import java.util.Random; + + +/** + * A thin wrapper layer over {@link StreamExecutionEnvironment}. + * + * The PythonStreamExecutionEnvironment is the context in which a streaming program is executed. + * + * + * The environment provides methods to control the job execution (such as setting the parallelism + * or the fault tolerance/checkpointing parameters) and to interact with the outside world + * (data access). + */ +@PublicEvolving +public class PythonStreamExecutionEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(PythonStreamExecutionEnvironment.class); + private final StreamExecutionEnvironment env; + + /** +* A thin wrapper layer over {@link StreamExecutionEnvironment#getExecutionEnvironment()}. In addition it takes +* care for required Jython serializers registration. +* +* @return The python execution environment of the context in which the program is +* executed. +*/ + public static PythonStreamExecutionEnvironment get_execution_environment() { + return new PythonStreamExecutionEnvironment(StreamExecutionEnvironment.getExecutionEnvironment()); + } + + /** +* Creates a {@link LocalStreamEnvironment}. The local execution environment +* will run the program in a multi-threaded fashion in the same JVM as the +* environment was created in. The default parallelism of the local +* environment is the number of hardware contexts (CPU cores / threads), +* unless it was specified differently
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124929#comment-16124929 ] ASF GitHub Bot commented on FLINK-7337: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132842811 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException("Field name can not be '*'.") } -(fieldNames.toArray, fieldIndexes.toArray) +(fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array --- End diff -- In my local environment, `toArray` also seems to be redundant. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132842811 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException("Field name can not be '*'.") } -(fieldNames.toArray, fieldIndexes.toArray) +(fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array --- End diff -- In my local environment, `toArray` also seems to be redundant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124928#comment-16124928 ] ASF GitHub Bot commented on FLINK-5886: --- Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132842489 --- Diff: flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.streaming.python.api.PythonStreamBinder; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +public class PythonStreamBinderTest extends StreamingProgramTestBase { + final private static String defaultPythonScriptName = "run_all_tests.py"; + final private static String flinkPythonRltvPath = "flink-libraries/flink-streaming-python"; + final private static String pathToStreamingTests = "src/test/python/org/apache/flink/streaming/python/api"; + + public PythonStreamBinderTest() { + } + + public static void main(String[] args) throws Exception { + if (args.length == 0) { + args = prepareDefaultArgs(); + } else { + args[0] = findStreamTestFile(args[0]).getAbsolutePath(); + } + PythonStreamBinder.main(args); + } + + @Override + public void testProgram() throws Exception { + this.main(new String[]{}); + } + + private static String[] prepareDefaultArgs() throws Exception { + File testFullPath = findStreamTestFile(defaultPythonScriptName); --- End diff -- I agree that the function names here are a bit confusing - in essence this function locates a single test file, while the function in the next code line `getFilesInFolder` collects files that start with `test_`, thus the main test file `run_all_tests.py` will be filtered and not be included. So, in order to be more readable and robust, I changed the `getFilesInFolder` to receive one more argument of `excludes` and call it with the main test file in the `excludes` argument. > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API >Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132842489 --- Diff: flink-libraries/flink-streaming-python/src/test/java/org/apache/flink/streaming/python/api/PythonStreamBinderTest.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.python.api; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.util.StreamingProgramTestBase; +import org.apache.flink.streaming.python.api.PythonStreamBinder; + +import java.io.File; +import java.io.FileNotFoundException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +public class PythonStreamBinderTest extends StreamingProgramTestBase { + final private static String defaultPythonScriptName = "run_all_tests.py"; + final private static String flinkPythonRltvPath = "flink-libraries/flink-streaming-python"; + final private static String pathToStreamingTests = "src/test/python/org/apache/flink/streaming/python/api"; + + public PythonStreamBinderTest() { + } + + public static void main(String[] args) throws Exception { + if (args.length == 0) { + args = prepareDefaultArgs(); + } else { + args[0] = findStreamTestFile(args[0]).getAbsolutePath(); + } + PythonStreamBinder.main(args); + } + + @Override + public void testProgram() throws Exception { + this.main(new String[]{}); + } + + private static String[] prepareDefaultArgs() throws Exception { + File testFullPath = findStreamTestFile(defaultPythonScriptName); --- End diff -- I agree that the function names here are a bit confusing - in essence this function locates a single test file, while the function in the next code line `getFilesInFolder` collects files that start with `test_`, thus the main test file `run_all_tests.py` will be filtered and not be included. So, in order to be more readable and robust, I changed the `getFilesInFolder` to receive one more argument of `excludes` and call it with the main test file in the `excludes` argument. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5886) Python API for streaming applications
[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124914#comment-16124914 ] ASF GitHub Bot commented on FLINK-5886: --- Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132841381 --- Diff: flink-libraries/flink-streaming-python/pom.xml --- @@ -0,0 +1,104 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-libraries + 1.4-SNAPSHOT + .. + + + flink-streaming-python_${scala.binary.version} + flink-streaming-python + jar + + + + + org.apache.maven.plugins + maven-jar-plugin + + + jar-with-dependencies + + + + true + org.apache.flink.streaming.python.api.PythonStreamBinder + + + + + + + + + + + + + org.apache.flink + flink-core + ${project.version} +provided + + + org.apache.flink + flink-java + ${project.version} +provided + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} +provided + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} +provided + + + org.python + jython-standalone + 2.7.0 + + + org.apache.flink + flink-connector-kafka-0.9_2.10 --- End diff -- Done for the Scala version. As for not being able to run the tests: 1. I fixed the issue with the ```TypeError: object of type 'java.lang.Class' has no len()```. 2. I still can't reproduce the main issue, concerning an import of java class from the Python module: File: ```flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py``` Line:19: ```from org.apache.flink.api.java.utils import ParameterTool``` The given class (ParameterTool) resides in different project `flink-java` and the jython module cannot find it. Probably, It somehow concerns the CLASSPATH. Any suggestion for how to reproduce it? > Python API for streaming applications > - > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API >Reporter: Zohar Mizrahi >Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r132841381 --- Diff: flink-libraries/flink-streaming-python/pom.xml --- @@ -0,0 +1,104 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-libraries + 1.4-SNAPSHOT + .. + + + flink-streaming-python_${scala.binary.version} + flink-streaming-python + jar + + + + + org.apache.maven.plugins + maven-jar-plugin + + + jar-with-dependencies + + + + true + org.apache.flink.streaming.python.api.PythonStreamBinder + + + + + + + + + + + + + org.apache.flink + flink-core + ${project.version} +provided + + + org.apache.flink + flink-java + ${project.version} +provided + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} +provided + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} +provided + + + org.python + jython-standalone + 2.7.0 + + + org.apache.flink + flink-connector-kafka-0.9_2.10 --- End diff -- Done for the Scala version. As for not being able to run the tests: 1. I fixed the issue with the ```TypeError: object of type 'java.lang.Class' has no len()```. 2. I still can't reproduce the main issue, concerning an import of java class from the Python module: File: ```flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/utils/python_test_base.py``` Line:19: ```from org.apache.flink.api.java.utils import ParameterTool``` The given class (ParameterTool) resides in different project `flink-java` and the jython module cannot find it. Probably, It somehow concerns the CLASSPATH. Any suggestion for how to reproduce it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124908#comment-16124908 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132834552 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( +s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " + + s"the table that should be converted to a DataStream.\n" + + s"Please select the rowtime field that should be used as event-time timestamp for the " + + s"DataStream by casting all other fields to TIMESTAMP.") +} else if (rowtimeFields.size == 1) { + val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType + val convFieldTypes = origRowType.getFieldTypes.map { t => +if (FlinkTypeFactory.isRowtimeIndicatorType(t)) { --- End diff -- . > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124909#comment-16124909 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132839934 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala --- @@ -16,32 +16,32 @@ * limitations under the License. */ -package org.apache.flink.table.runtime +package org.apache.flink.table.runtime.conversion import java.lang.{Boolean => JBool} import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration import org.apache.flink.table.codegen.Compiler import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row -import org.slf4j.LoggerFactory -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.slf4j.{Logger, LoggerFactory} /** - * Convert [[CRow]] to a [[JTuple2]] + * Convert [[CRow]] to a [[JTuple2]]. */ -class CRowInputJavaTupleOutputMapRunner( +class CRowToJavaTupleMapRunner( name: String, code: String, @transient var returnType: TypeInformation[JTuple2[JBool, Any]]) extends RichMapFunction[CRow, Any] with ResultTypeQueryable[JTuple2[JBool, Any]] with Compiler[MapFunction[Row, Any]] { --- End diff -- indent > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124911#comment-16124911 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132841043 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- I'm fine with both current approach and use time indicator of left table as default. But I think no exception should be thrown when writing a Table to a TableSink. But currently, they share the same exception code path. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes
[ https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124910#comment-16124910 ] ASF GitHub Bot commented on FLINK-7337: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132840719 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException("Field name can not be '*'.") } -(fieldNames.toArray, fieldIndexes.toArray) +(fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array --- End diff -- It builds successfully when I remove the `toArray` in my local environment. > Refactor handling of time indicator attributes > -- > > Key: FLINK-7337 > URL: https://issues.apache.org/jira/browse/FLINK-7337 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > After a [discussion on the dev mailing > list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E] > I propose the following changes to the current handling of time indicator > attributes: > * Remove the separation of logical and physical row type. > ** Hold the event-time timestamp as regular Long field in Row > ** Represent the processing-time indicator type as a null-valued field in Row > (1 bit overhead) > * Remove materialization of event-time timestamps because timestamp is > already accessible in Row. > * Add {{ProcessFunction}} to set timestamp into the timestamp field of a > {{StreamRecord}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132840719 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -719,33 +715,47 @@ abstract class TableEnvironment(val config: TableConfig) { throw new TableException("Field name can not be '*'.") } -(fieldNames.toArray, fieldIndexes.toArray) +(fieldNames.toArray, fieldIndexes.toArray) // build fails if not converted to array --- End diff -- It builds successfully when I remove the `toArray` in my local environment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132841043 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( --- End diff -- I'm fine with both current approach and use time indicator of left table as default. But I think no exception should be thrown when writing a Table to a TableSink. But currently, they share the same exception code path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132834552 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -667,30 +719,62 @@ abstract class StreamTableEnvironment( // get CRow plan val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig) +val rowtimeFields = logicalType + .getFieldList.asScala + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +// convert the input type for the conversion mapper +// the input will be changed in the OutputRowtimeProcessFunction later +val convType = if (rowtimeFields.size > 1) { + throw new TableException( +s"Found more than one rowtime field: [${rowtimeFields.map(_.getName).mkString(", ")}] in " + + s"the table that should be converted to a DataStream.\n" + + s"Please select the rowtime field that should be used as event-time timestamp for the " + + s"DataStream by casting all other fields to TIMESTAMP.") +} else if (rowtimeFields.size == 1) { + val origRowType = plan.getType.asInstanceOf[CRowTypeInfo].rowType + val convFieldTypes = origRowType.getFieldTypes.map { t => +if (FlinkTypeFactory.isRowtimeIndicatorType(t)) { --- End diff -- . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4532#discussion_r132839934 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala --- @@ -16,32 +16,32 @@ * limitations under the License. */ -package org.apache.flink.table.runtime +package org.apache.flink.table.runtime.conversion import java.lang.{Boolean => JBool} import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.configuration.Configuration import org.apache.flink.table.codegen.Compiler import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row -import org.slf4j.LoggerFactory -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.slf4j.{Logger, LoggerFactory} /** - * Convert [[CRow]] to a [[JTuple2]] + * Convert [[CRow]] to a [[JTuple2]]. */ -class CRowInputJavaTupleOutputMapRunner( +class CRowToJavaTupleMapRunner( name: String, code: String, @transient var returnType: TypeInformation[JTuple2[JBool, Any]]) extends RichMapFunction[CRow, Any] with ResultTypeQueryable[JTuple2[JBool, Any]] with Compiler[MapFunction[Row, Any]] { --- End diff -- indent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP
[ https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-7435: - Assignee: Kostas Kloudas > FsStateBackend with incremental backup enable does not work with Keyed CEP > -- > > Key: FLINK-7435 > URL: https://issues.apache.org/jira/browse/FLINK-7435 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 > Environment: AWS EMR YARN, use CEP with pattern start -> next > (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend > with Incremental option open. >Reporter: daiqing >Assignee: Kostas Kloudas > > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Could not copy NFA. > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) > ... 7 more > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) > at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) > at > java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) > ... 17 more -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP
[ https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124875#comment-16124875 ] Kostas Kloudas commented on FLINK-7435: --- The problem here seems to be on the {{copy()}} method of the {{NFA}} which currently returns {{this}} and not an actual copy. > FsStateBackend with incremental backup enable does not work with Keyed CEP > -- > > Key: FLINK-7435 > URL: https://issues.apache.org/jira/browse/FLINK-7435 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.3.1, 1.3.2 > Environment: AWS EMR YARN, use CEP with pattern start -> next > (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend > with Incremental option open. >Reporter: daiqing > > java.lang.RuntimeException: Exception occurred while processing valve output > watermark: > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Could not copy NFA. > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) > at > org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) > ... 7 more > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) > at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) > at > java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) > ... 17 more -- This message was sent by Atlassian JIRA (v6.4.14#64029)