[jira] [Commented] (FLINK-10218) Allow writing DataSet without explicit path parameter
[ https://issues.apache.org/jira/browse/FLINK-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593154#comment-16593154 ] ASF GitHub Bot commented on FLINK-10218: link3280 opened a new pull request #6616: [FLINK-10218] Allow writing DataSet without explicit path parameter URL: https://github.com/apache/flink/pull/6616 ## What is the purpose of the change Add an file output helper method, which requires only FileOutputFormat parameter, to DataSet API. This can avoid setting duplicate path parameters, since the output path could be found in FileOutputFormat. ## Brief change log - *Add an file output helper method, which requires only FileOutputFormat parameter, to DataSet API.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow writing DataSet without explicit path parameter > - > > Key: FLINK-10218 > URL: https://issues.apache.org/jira/browse/FLINK-10218 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Affects Versions: 1.6.0 >Reporter: Paul Lin >Priority: Minor > Labels: pull-request-available > > Currently, DataSet API has two overloaded `write` methods for using > FileOutputFormat as output format, and both require a path parameter, but the > output path could already be set in the FileOutputFormat object. What's more, > the subclasses of FileOutputFormat mostly don't have default constructors and > required a path parameter too, so users have to set output path twice in the > code, like: > {code:java} > String output = "hdfs:///tmp/"; > dataset.write(new TextOutputFormat<>(new Path(output)), output); > {code} > So I propose to add another write helper method that requires no path > parameter. May someone assign this issue to me? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10218) Allow writing DataSet without explicit path parameter
[ https://issues.apache.org/jira/browse/FLINK-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10218: --- Labels: pull-request-available (was: ) > Allow writing DataSet without explicit path parameter > - > > Key: FLINK-10218 > URL: https://issues.apache.org/jira/browse/FLINK-10218 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Affects Versions: 1.6.0 >Reporter: Paul Lin >Priority: Minor > Labels: pull-request-available > > Currently, DataSet API has two overloaded `write` methods for using > FileOutputFormat as output format, and both require a path parameter, but the > output path could already be set in the FileOutputFormat object. What's more, > the subclasses of FileOutputFormat mostly don't have default constructors and > required a path parameter too, so users have to set output path twice in the > code, like: > {code:java} > String output = "hdfs:///tmp/"; > dataset.write(new TextOutputFormat<>(new Path(output)), output); > {code} > So I propose to add another write helper method that requires no path > parameter. May someone assign this issue to me? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] link3280 opened a new pull request #6616: [FLINK-10218] Allow writing DataSet without explicit path parameter
link3280 opened a new pull request #6616: [FLINK-10218] Allow writing DataSet without explicit path parameter URL: https://github.com/apache/flink/pull/6616 ## What is the purpose of the change Add an file output helper method, which requires only FileOutputFormat parameter, to DataSet API. This can avoid setting duplicate path parameters, since the output path could be found in FileOutputFormat. ## Brief change log - *Add an file output helper method, which requires only FileOutputFormat parameter, to DataSet API.* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10218) Allow writing DataSet without explicit path parameter
[ https://issues.apache.org/jira/browse/FLINK-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593135#comment-16593135 ] vinoyang commented on FLINK-10218: -- [~Paul Lin] Currently you may have no contributor permissions, ping [~till.rohrmann] [~Zentol] > Allow writing DataSet without explicit path parameter > - > > Key: FLINK-10218 > URL: https://issues.apache.org/jira/browse/FLINK-10218 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Affects Versions: 1.6.0 >Reporter: Paul Lin >Priority: Minor > > Currently, DataSet API has two overloaded `write` methods for using > FileOutputFormat as output format, and both require a path parameter, but the > output path could already be set in the FileOutputFormat object. What's more, > the subclasses of FileOutputFormat mostly don't have default constructors and > required a path parameter too, so users have to set output path twice in the > code, like: > {code:java} > String output = "hdfs:///tmp/"; > dataset.write(new TextOutputFormat<>(new Path(output)), output); > {code} > So I propose to add another write helper method that requires no path > parameter. May someone assign this issue to me? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10170) Support string representation for map types in descriptor-based Table API
[ https://issues.apache.org/jira/browse/FLINK-10170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593132#comment-16593132 ] ASF GitHub Bot commented on FLINK-10170: tragicjun commented on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API URL: https://github.com/apache/flink/pull/6578#issuecomment-416097515 @twalthr could you review this again? Anything else I should do before the merge, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support string representation for map types in descriptor-based Table API > - > > Key: FLINK-10170 > URL: https://issues.apache.org/jira/browse/FLINK-10170 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Jun Zhang >Assignee: Jun Zhang >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > Attachments: flink-10170 > > > Since 1.6 the recommended way of creating source/sink table is using > connector/format/schema/ descriptors. However, when declaring map types in > the schema descriptor, the following exception would be thrown: > {quote}org.apache.flink.table.api.TableException: A string representation for > map types is not supported yet.{quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tragicjun commented on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API
tragicjun commented on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API URL: https://github.com/apache/flink/pull/6578#issuecomment-416097515 @twalthr could you review this again? Anything else I should do before the merge, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-10216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593129#comment-16593129 ] vinoyang commented on FLINK-10216: -- [~juho.autio.r] Flink already has a function for regular matching, called "similar to", which is provided by Apache Calcite. Reference documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/sql.html#built-in-functions > Add REGEXP_MATCH in TableAPI and SQL > > > Key: FLINK-10216 > URL: https://issues.apache.org/jira/browse/FLINK-10216 > Project: Flink > Issue Type: Sub-task >Reporter: Juho Autio >Priority: Major > > Here's a naive implementation: > {code:java} > public class RegexpMatchFunction extends ScalarFunction { > // NOTE! Flink calls eval() by reflection > public boolean eval(String value, String pattern) { > return value != null && pattern != null && value.matches(pattern); > } > } > {code} > I wonder if there would be a way to optimize this to use > {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls > (possibly different values, but same pattern). > h3. Naming > Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: > [https://github.com/apache/flink/pull/6448#issuecomment-415972833] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9990) Add regex_extract supported in TableAPI and SQL
[ https://issues.apache.org/jira/browse/FLINK-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593121#comment-16593121 ] ASF GitHub Bot commented on FLINK-9990: --- yanghua commented on issue #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#issuecomment-416095640 @juhoautio Yes, you are right, agree! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add regex_extract supported in TableAPI and SQL > --- > > Key: FLINK-9990 > URL: https://issues.apache.org/jira/browse/FLINK-9990 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > regex_extract is a very useful function, it returns a string based on a regex > pattern and a index. > For example : > {code:java} > regexp_extract('foothebar', 'foo(.*?)(bar)', 2) // returns 'bar.' > {code} > It is provided as a UDF in Hive, more details please see[1]. > [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL
yanghua commented on issue #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL URL: https://github.com/apache/flink/pull/6448#issuecomment-416095640 @juhoautio Yes, you are right, agree! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593118#comment-16593118 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212852318 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { Review comment: Sorry, I missed your `PubSubSinkBuilder`, I just saw that you have done a lot of NPE checks on the `initialize` method in this class. If they are mandatory fields, you can pass them directly in the constructor of `PubSubSinkBuilder`. Maybe this can save these checks? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PubSub connector > > > Key: FLINK-9311 > URL: https://issues.apache.org/jira/browse/FLINK-9311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Richard Deurwaarder >Assignee: Niels Basjes >Priority: Minor > Labels: pull-request-available > > I would like start adding some google cloud connectors starting with a PubSub > Source. I have a basic implementation ready but I want it to be able to: > * easily scale up (should I have it extend RichParallelSourceFunction for > this?) > * Make it easier to provide the google cloud credentials. This would require > being able to send some json string / ServiceAccount to the nodes when > starting up this source. > Could this be something that would be useful for others and added to the > flink connectors repo? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212852318 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider; + +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +import java.io.IOException; + +/** + * A sink function that outputs to PubSub. + * + * @param type of PubSubSink messages to write + */ +public class PubSubSink extends RichSinkFunction { + + private SerializableCredentialsProvider serializableCredentialsProvider; + private SerializationSchema serializationSchema; + private String projectName; + private String topicName; + private String hostAndPort = null; + + private transient Publisher publisher; + + private PubSubSink() { Review comment: Sorry, I missed your `PubSubSinkBuilder`, I just saw that you have done a lot of NPE checks on the `initialize` method in this class. If they are mandatory fields, you can pass them directly in the constructor of `PubSubSinkBuilder`. Maybe this can save these checks? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593115#comment-16593115 ] ASF GitHub Bot commented on FLINK-9311: --- yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212851768 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: yes, I mean to expose
[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212851768 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: yes, I mean to expose the number of messages received as metrics, just an idea, not sure if it is a good suggestion. This is an automated message from the Apache Git Service. To respond to the
[jira] [Updated] (FLINK-10218) Allow writing DataSet without explicit path parameter
[ https://issues.apache.org/jira/browse/FLINK-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paul Lin updated FLINK-10218: - Description: Currently, DataSet API has two overloaded `write` methods for using FileOutputFormat as output format, and both require a path parameter, but the output path could already be set in the FileOutputFormat object. What's more, the subclasses of FileOutputFormat mostly don't have default constructors and required a path parameter too, so users have to set output path twice in the code, like: {code:java} String output = "hdfs:///tmp/"; dataset.write(new TextOutputFormat<>(new Path(output)), output); {code} So I propose to add another write helper method that requires no path parameter. May someone assign this issue to me? was: Currently, DataSet API has two overloaded write methods for using FileOutputFormat as output format, and both require a path parameter, but the output path could already be set in the FileOutputFormat object. What's more, the subclasses of FileOutputFormat mostly don't have default constructors and required a path parameter too, so users have to set output path twice in the code, like: {code:java} String output = "hdfs:///tmp/"; dataset.write(new TextOutputFormat<>(new Path(output)), output); {code} So I propose to add another write method that requires no path parameter. May someone assign this issue to me? Summary: Allow writing DataSet without explicit path parameter (was: Allow write DataSet without path parameter) > Allow writing DataSet without explicit path parameter > - > > Key: FLINK-10218 > URL: https://issues.apache.org/jira/browse/FLINK-10218 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Affects Versions: 1.6.0 >Reporter: Paul Lin >Priority: Minor > > Currently, DataSet API has two overloaded `write` methods for using > FileOutputFormat as output format, and both require a path parameter, but the > output path could already be set in the FileOutputFormat object. What's more, > the subclasses of FileOutputFormat mostly don't have default constructors and > required a path parameter too, so users have to set output path twice in the > code, like: > {code:java} > String output = "hdfs:///tmp/"; > dataset.write(new TextOutputFormat<>(new Path(output)), output); > {code} > So I propose to add another write helper method that requires no path > parameter. May someone assign this issue to me? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593082#comment-16593082 ] JIN SUN commented on FLINK-10205: - Great Stephan, i actually has the same idea, for the batch failover, there are a bunch of things we need to consider and discuss. I can prepare a FLIP. > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers
[ https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593020#comment-16593020 ] ASF GitHub Bot commented on FLINK-8354: --- alexeyt820 opened a new pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615 ## What is the purpose of the change This changes allows to access Kafka headers in KeyedDeserializationSchema, and to provide Kafka headers via KeyedSerializationSchema.headers method ## Brief change log - Add default _headers_ method to KeyedSerializationSchema interface - Add default deserialize method with _headers_ argument to KeyedDeserializationSchema - Add Kafka011Fetcher to get headers from Kafka ConsumerRecord and provide them as parameter KeyedDeserializationSchema - Modify FlinkKafkaProducer011 to request _headers_ from KeyedSerializationSchema and add them to ProducerRecord. ## Verifying this change This change added tests and can be verified as follows: - *Added integration test *Kafka011ITCase.testHeaders* to verify that we can produce headers and consume them ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flink Kafka connector ignores Kafka message headers > - > > Key: FLINK-8354 > URL: https://issues.apache.org/jira/browse/FLINK-8354 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Environment: Kafka 0.11.0.0 > Flink 1.4.0 > flink-connector-kafka-0.11_2.11 >Reporter: Mohammad Abareghi >Assignee: Aegeaner >Priority: Major > Labels: pull-request-available > > Kafka has introduced notion of Header for messages in version 0.11.0.0 > https://issues.apache.org/jira/browse/KAFKA-4208. > But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores > headers when consuming kafka messages. > It would be useful in some scenarios, such as distributed log tracing, to > support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8354) Flink Kafka connector ignores Kafka message headers
[ https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-8354: -- Labels: pull-request-available (was: ) > Flink Kafka connector ignores Kafka message headers > - > > Key: FLINK-8354 > URL: https://issues.apache.org/jira/browse/FLINK-8354 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Environment: Kafka 0.11.0.0 > Flink 1.4.0 > flink-connector-kafka-0.11_2.11 >Reporter: Mohammad Abareghi >Assignee: Aegeaner >Priority: Major > Labels: pull-request-available > > Kafka has introduced notion of Header for messages in version 0.11.0.0 > https://issues.apache.org/jira/browse/KAFKA-4208. > But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores > headers when consuming kafka messages. > It would be useful in some scenarios, such as distributed log tracing, to > support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] alexeyt820 opened a new pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
alexeyt820 opened a new pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers URL: https://github.com/apache/flink/pull/6615 ## What is the purpose of the change This changes allows to access Kafka headers in KeyedDeserializationSchema, and to provide Kafka headers via KeyedSerializationSchema.headers method ## Brief change log - Add default _headers_ method to KeyedSerializationSchema interface - Add default deserialize method with _headers_ argument to KeyedDeserializationSchema - Add Kafka011Fetcher to get headers from Kafka ConsumerRecord and provide them as parameter KeyedDeserializationSchema - Modify FlinkKafkaProducer011 to request _headers_ from KeyedSerializationSchema and add them to ProducerRecord. ## Verifying this change This change added tests and can be verified as follows: - *Added integration test *Kafka011ITCase.testHeaders* to verify that we can produce headers and consume them ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9172) Support external catalog factory that comes default with SQL-Client
[ https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592927#comment-16592927 ] Rong Rong commented on FLINK-9172: -- [~eronwright] yes. your are right. This is the additional feature in FLIP-24 that needs to be implemented. > Support external catalog factory that comes default with SQL-Client > --- > > Key: FLINK-9172 > URL: https://issues.apache.org/jira/browse/FLINK-9172 > Project: Flink > Issue Type: Sub-task >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > It will be great to have SQL-Client to support some external catalogs > out-of-the-box for SQL users to configure and utilize easily. I am currently > think of having an external catalog factory that spins up both streaming and > batch external catalog table sources and sinks. This could greatly unify and > provide easy access for SQL users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592910#comment-16592910 ] ASF GitHub Bot commented on FLINK-8532: --- StephanEwen commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-416043267 Thanks for taking a deeper look. Unfortunately, divisions (modulo) are even more expensive, so would be good to avoid them. I think the solution can be actually a bit simpler. It would probably be sufficient to simply initialize the array to `INT_MAX - 1` replace the `this.returnArray[0] = 0;` in the original code with `this.returnArray[0] = resetValue()`. Inside the `resetValue()` you can do the initialization. That way, common cases have no additional check, and the overflow/reset case gets one additional branch, which is already a good improvement. We could possibly do a followup optimization, where outputs that have only one channel swap in a special selector that always returns just `0`. The one-channel-only case is probably the one that would be affected most by this change, because it always overflows each time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition
StephanEwen commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-416043267 Thanks for taking a deeper look. Unfortunately, divisions (modulo) are even more expensive, so would be good to avoid them. I think the solution can be actually a bit simpler. It would probably be sufficient to simply initialize the array to `INT_MAX - 1` replace the `this.returnArray[0] = 0;` in the original code with `this.returnArray[0] = resetValue()`. Inside the `resetValue()` you can do the initialization. That way, common cases have no additional check, and the overflow/reset case gets one additional branch, which is already a good improvement. We could possibly do a followup optimization, where outputs that have only one channel swap in a special selector that always returns just `0`. The one-channel-only case is probably the one that would be affected most by this change, because it always overflows each time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592884#comment-16592884 ] ASF GitHub Bot commented on FLINK-5315: --- hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r212824390 ## File path: docs/dev/table/tableApi.md ## @@ -381,6 +381,36 @@ Table result = orders {% highlight java %} Table orders = tableEnv.scan("Orders"); Table result = orders.distinct(); +{% endhighlight %} +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. + + + + +Distinct Aggregation +Streaming Review comment: Add batch label? It seems batch already support distinct. BTW, over is not yet supported in batch. Maybe it is better to remove these documents about distinct aggregation? Append a distinct column in all Aggregation examples? For `GroupBy Aggregation`, change the sql from ``` Table orders = tableEnv.scan("Orders"); Table result = orders.groupBy("a").select("a, b.sum as d"); ``` to ``` Table orders = tableEnv.scan("Orders"); Table result = orders.groupBy("a").select("a, b.sum as d, b.sum.distinct as e"); ``` What's more, we probably should add UDAGG document like it in SQL document about Aggregations and add distinct column. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > Support distinct aggregations in Table API in the following format: > For Expressions: > {code:scala} > 'a.count.distinct // Expressions distinct modifier > {code} > For User-defined Function: > {code:scala} > singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier > multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream URL: https://github.com/apache/flink/pull/6521#discussion_r212824390 ## File path: docs/dev/table/tableApi.md ## @@ -381,6 +381,36 @@ Table result = orders {% highlight java %} Table orders = tableEnv.scan("Orders"); Table result = orders.distinct(); +{% endhighlight %} +Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. + + + + +Distinct Aggregation +Streaming Review comment: Add batch label? It seems batch already support distinct. BTW, over is not yet supported in batch. Maybe it is better to remove these documents about distinct aggregation? Append a distinct column in all Aggregation examples? For `GroupBy Aggregation`, change the sql from ``` Table orders = tableEnv.scan("Orders"); Table result = orders.groupBy("a").select("a, b.sum as d"); ``` to ``` Table orders = tableEnv.scan("Orders"); Table result = orders.groupBy("a").select("a, b.sum as d, b.sum.distinct as e"); ``` What's more, we probably should add UDAGG document like it in SQL document about Aggregations and add distinct column. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10219) remove the IRC mention from the website
Nicos Maris created FLINK-10219: --- Summary: remove the IRC mention from the website Key: FLINK-10219 URL: https://issues.apache.org/jira/browse/FLINK-10219 Project: Flink Issue Type: Task Components: Project Website Reporter: Nicos Maris This is the outcome of the following ticket along with a PR. https://issues.apache.org/jira/browse/FLINK-10217 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9432) Support extract epoch, decade, millisecond, microsecond
[ https://issues.apache.org/jira/browse/FLINK-9432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9432: -- Labels: pull-request-available (was: ) > Support extract epoch, decade, millisecond, microsecond > --- > > Key: FLINK-9432 > URL: https://issues.apache.org/jira/browse/FLINK-9432 > Project: Flink > Issue Type: New Feature >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > The task is to separate activity from depending on > https://issues.apache.org/jira/browse/CALCITE-2303 from all others that could > be done without upgrade avatica/calcite in > https://issues.apache.org/jira/browse/FLINK-8518 > Now the implementations of next functions are blocked > {code:sql} > extract(decade from ...) > extract(epoch from ...) > extract(millisecond from ...) > extract(microsecond from ...) > extract(isodow from ...) > extract(isoyear from ...) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9432) Support extract epoch, decade, millisecond, microsecond
[ https://issues.apache.org/jira/browse/FLINK-9432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592864#comment-16592864 ] ASF GitHub Bot commented on FLINK-9432: --- snuyanzin opened a new pull request #6614: [FLINK-9432] Support extract timeunit: DECADE, EPOCH, ISODOW, ISOYEAR… URL: https://github.com/apache/flink/pull/6614 ## What is the purpose of the change The PR provides implementation of `extract` function for timeunits: `decade`, `epoch`, `isodow`, `isoyear`, 'millisecond`,`microsecond` ## Brief change log - support timeunits `extract` - documentation updates (mention all possible timeunits for `extract`) - tests ## Verifying this change - *Added tests validates the result of `extract` call* - *Added tests validates that `extract` of `isodow`, `isodoy`, `epoch`, `decade` for time is incorrect* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) functions.md updated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support extract epoch, decade, millisecond, microsecond > --- > > Key: FLINK-9432 > URL: https://issues.apache.org/jira/browse/FLINK-9432 > Project: Flink > Issue Type: New Feature >Reporter: Sergey Nuyanzin >Assignee: Sergey Nuyanzin >Priority: Major > Labels: pull-request-available > > The task is to separate activity from depending on > https://issues.apache.org/jira/browse/CALCITE-2303 from all others that could > be done without upgrade avatica/calcite in > https://issues.apache.org/jira/browse/FLINK-8518 > Now the implementations of next functions are blocked > {code:sql} > extract(decade from ...) > extract(epoch from ...) > extract(millisecond from ...) > extract(microsecond from ...) > extract(isodow from ...) > extract(isoyear from ...) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] snuyanzin opened a new pull request #6614: [FLINK-9432] Support extract timeunit: DECADE, EPOCH, ISODOW, ISOYEAR…
snuyanzin opened a new pull request #6614: [FLINK-9432] Support extract timeunit: DECADE, EPOCH, ISODOW, ISOYEAR… URL: https://github.com/apache/flink/pull/6614 ## What is the purpose of the change The PR provides implementation of `extract` function for timeunits: `decade`, `epoch`, `isodow`, `isoyear`, 'millisecond`,`microsecond` ## Brief change log - support timeunits `extract` - documentation updates (mention all possible timeunits for `extract`) - tests ## Verifying this change - *Added tests validates the result of `extract` call* - *Added tests validates that `extract` of `isodow`, `isodoy`, `epoch`, `decade` for time is incorrect* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) functions.md updated This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10217) use Slack for user support and questions
[ https://issues.apache.org/jira/browse/FLINK-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10217. Resolution: Won't Fix Slack was already discussed recently on the ML and rejected. > use Slack for user support and questions > > > Key: FLINK-10217 > URL: https://issues.apache.org/jira/browse/FLINK-10217 > Project: Flink > Issue Type: Task > Components: Project Website >Reporter: Nicos Maris >Priority: Critical > > h2. Current status > For user support and questions, users are instructed to subscribe to > u...@flink.apache.org but there are users like me who enjoy using also a chat > channel. However, the instructions to do so are not clear and the IRC > activity is low and it is definitely [not indicative of the project's > activity|https://issues.apache.org/jira/browse/FLINK-3862?focusedCommentId=16152376=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16152376]. > The [website|https://flink.apache.org/community.html] mentions that "If you > want to talk with the Flink committers and users in a chat, there is an [IRC > channel|https://flink.apache.org/community.html#irc].; > h2. Option 1: Use Slack > An example of an Apache project that using > [Slack|https://tedium.co/2017/10/17/irc-vs-slack-chat-history] is: > http://mesos.apache.org/community > I can assist on setting it up if at least one expert joins from the very > beginning. > h2. Option 2: Keep using IRC and document it > Add the [missing > section|https://github.com/apache/flink-web/blob/master/community.md#irc] at > the website along with instructions for people who have never used IRC. > h2. Option 3: Use only the mailing list > Use only u...@flink.apache.org for user support and questions and do not > mention IRC at the website. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10218) Allow write DataSet without path parameter
Paul Lin created FLINK-10218: Summary: Allow write DataSet without path parameter Key: FLINK-10218 URL: https://issues.apache.org/jira/browse/FLINK-10218 Project: Flink Issue Type: Improvement Components: DataSet API Affects Versions: 1.6.0 Reporter: Paul Lin Currently, DataSet API has two overloaded write methods for using FileOutputFormat as output format, and both require a path parameter, but the output path could already be set in the FileOutputFormat object. What's more, the subclasses of FileOutputFormat mostly don't have default constructors and required a path parameter too, so users have to set output path twice in the code, like: {code:java} String output = "hdfs:///tmp/"; dataset.write(new TextOutputFormat<>(new Path(output)), output); {code} So I propose to add another write method that requires no path parameter. May someone assign this issue to me? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9311) PubSub connector
[ https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592832#comment-16592832 ] ASF GitHub Bot commented on FLINK-9311: --- Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212798502 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: What information could
[GitHub] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r212798502 ## File path: flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pubsub; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Timer; +import java.util.TimerTask; + +class Bound implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(Bound.class); + + private final Bound.Mode mode; + private final long maxMessagedReceived; + private final long maxTimeBetweenMessages; + + private SourceFunction sourceFunction; + private transient Timer timer; + private long messagesReceived; + private long lastReceivedMessage; + private boolean cancelled = false; + + private Bound(Bound.Mode mode, long maxMessagedReceived, long maxTimeBetweenMessages) { + this.mode = mode; + this.maxMessagedReceived = maxMessagedReceived; + this.maxTimeBetweenMessages = maxTimeBetweenMessages; + this.messagesReceived = 0L; + } + + static Bound boundByAmountOfMessages(long maxMessagedReceived) { + return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L); + } + + static Bound boundByTimeSinceLastMessage(long maxTimeBetweenMessages) { + return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages); + } + + static Bound boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long maxTimeBetweenMessages) { + return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, maxTimeBetweenMessages); + } + + private TimerTask shutdownPubSubSource() { + return new TimerTask() { + @Override + public void run() { + if (maxTimeBetweenMessagesElapsed()) { + cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source"); + timer.cancel(); + } + } + }; + } + + private synchronized boolean maxTimeBetweenMessagesElapsed() { + return System.currentTimeMillis() - lastReceivedMessage > maxTimeBetweenMessages; + } + + private synchronized void cancelPubSubSource(String logMessage) { + if (!cancelled) { + cancelled = true; + sourceFunction.cancel(); + LOG.info(logMessage); + } + } + + void start(SourceFunction sourceFunction) { + if (this.sourceFunction != null) { + throw new IllegalStateException("start() already called"); + } + + this.sourceFunction = sourceFunction; + messagesReceived = 0; + + if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) { + lastReceivedMessage = System.currentTimeMillis(); + timer = new Timer(); + timer.schedule(shutdownPubSubSource(), 0, 100); + } + } + + synchronized void receivedMessage() { + if (sourceFunction == null) { + throw new IllegalStateException("start() not called"); + } + + lastReceivedMessage = System.currentTimeMillis(); + messagesReceived++; Review comment: What information could we report as metrics? I was assuming counters for messages received by the PubSubSource would be available as Metrics or is this not the case? This is an automated message