[GitHub] flink issue #4560: Flink 7077
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/4560 @tillrohrmann fixed the test issue and other noted issues. Once #4555 is in, this is good to go. The test issue was a race in RM acquiring leadership vs RPC call. Added some code to `TestLeaderElectionService` to wait for leader ack and updated the base RM and Mesos RM to use it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-7479) Support to retrieve the past event by physical offset
[ https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133888#comment-16133888 ] Dian Fu edited comment on FLINK-7479 at 8/19/17 2:27 AM: - Hi [~kkl0u], Thanks a lot for your reply. {quote} Before opening a PR for a feature, we first have to discuss iff we want to include that feature in the library. If we do not go like this, then the only thing that will happen is that we pile up PRs that we do not know how to handle at the end. {quote} Agree. I will create the PR until we agree on the feature next time. Thanks for remind. :) {quote} I have not looked at the code itself, but from the topic, this just seems to just be syntactic sugar on top of the existing APIs. {quote} I don't think so. The tests in the PR can show that it's not able to achieve this functionality on top of the existing APIs. {quote} Until users start using the library and ask for features, and given that this is a first iteration of the CEP/SQL integration, I would suggest to keep it to a minimal and avoid stuff that only contribute "facilitation methods" for already existing functionality. {quote} Totally agree. I will try to keep the CEP/SQL integration feature a minimal and only adding features when required. For the {{PREV}} clause, I think it should be included in the minimal feature as it's one important building block for pattern defination. was (Author: dian.fu): Hi [~kkl0u], {quote} Before opening a PR for a feature, we first have to discuss iff we want to include that feature in the library. If we do not go like this, then the only thing that will happen is that we pile up PRs that we do not know how to handle at the end. {quote} Agree. I will create the PR until we agree on the feature next time. Thanks for remind. :) {quote} I have not looked at the code itself, but from the topic, this just seems to just be syntactic sugar on top of the existing APIs. {quote} I don't think so. The tests in the PR can show that it's not able to achieve this functionality on top of the existing APIs. {quote} Until users start using the library and ask for features, and given that this is a first iteration of the CEP/SQL integration, I would suggest to keep it to a minimal and avoid stuff that only contribute "facilitation methods" for already existing functionality. {quote} Totally agree. I will try to keep the CEP/SQL integration feature a minimal and only adding features when required. For the {{PREV}} clause, I think it should be included in the minimal feature as it's one important building block for pattern defination. > Support to retrieve the past event by physical offset > -- > > Key: FLINK-7479 > URL: https://issues.apache.org/jira/browse/FLINK-7479 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, it's already able to retrieve events matched to the specifed > pattern in {{IterativeCondition.Context}}. While there are also requirements > to retrieve events by an physical offset. The retrieved events may not be > matched to any pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset
[ https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133888#comment-16133888 ] Dian Fu commented on FLINK-7479: Hi [~kkl0u], {quote} Before opening a PR for a feature, we first have to discuss iff we want to include that feature in the library. If we do not go like this, then the only thing that will happen is that we pile up PRs that we do not know how to handle at the end. {quote} Agree. I will create the PR until we agree on the feature next time. Thanks for remind. :) {quote} I have not looked at the code itself, but from the topic, this just seems to just be syntactic sugar on top of the existing APIs. {quote} I don't think so. The tests in the PR can show that it's not able to achieve this functionality on top of the existing APIs. {quote} Until users start using the library and ask for features, and given that this is a first iteration of the CEP/SQL integration, I would suggest to keep it to a minimal and avoid stuff that only contribute "facilitation methods" for already existing functionality. {quote} Totally agree. I will try to keep the CEP/SQL integration feature a minimal and only adding features when required. For the {{PREV}} clause, I think it should be included in the minimal feature as it's one important building block for pattern defination. > Support to retrieve the past event by physical offset > -- > > Key: FLINK-7479 > URL: https://issues.apache.org/jira/browse/FLINK-7479 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, it's already able to retrieve events matched to the specifed > pattern in {{IterativeCondition.Context}}. While there are also requirements > to retrieve events by an physical offset. The retrieved events may not be > matched to any pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset
[ https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133628#comment-16133628 ] Kostas Kloudas commented on FLINK-7479: --- Hi [~dian.fu], Before opening a PR for a feature, we first have to discuss iff we want to include that feature in the library. If we do not go like this, then the only thing that will happen is that we pile up PRs that we do not know how to handle at the end. I have not looked at the code itself, but from the topic, this just seems to just be syntactic sugar on top of the existing APIs. It may be necessary for completeness, as it is in the SQL specification, but a specification is just a specification until users start using it or asking for it. Until users start using the library and ask for features, and given that this is a first iteration of the CEP/SQL integration, I would suggest to keep it to a minimal and avoid stuff that only contribute "facilitation methods" for already existing functionality. > Support to retrieve the past event by physical offset > -- > > Key: FLINK-7479 > URL: https://issues.apache.org/jira/browse/FLINK-7479 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, it's already able to retrieve events matched to the specifed > pattern in {{IterativeCondition.Context}}. While there are also requirements > to retrieve events by an physical offset. The retrieved events may not be > matched to any pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector
[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133557#comment-16133557 ] ASF GitHub Bot commented on FLINK-6988: --- Github user rangadi commented on a diff in the pull request: https://github.com/apache/flink/pull/4239#discussion_r134044031 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -0,0 +1,1000 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.NetUtils; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Sink to produce data into a Kafka topic. This producer is
[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...
Github user rangadi commented on a diff in the pull request: https://github.com/apache/flink/pull/4239#discussion_r134044031 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -0,0 +1,1000 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.util.SerializableObject; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.NetUtils; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.11.x. By default producer + * will use {@link Semantic#EXACTLY_ONCE} semantic. + * + * Implementation note: This producer is a hybrid between a regular regular + * {@link
[GitHub] flink pull request #4560: Flink 7077
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4560#discussion_r134004745 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java --- @@ -68,7 +68,9 @@ public boolean isIdle() { } public void markIdle() { - idleSince = System.currentTimeMillis(); + if (!isIdle()) { + idleSince = System.currentTimeMillis(); + } --- End diff -- The reason that the SM idleness unit tests didn't catch this is, `markIdle` is called when a slot report is received, which in practice occurs continuously but is tough to simulate. Just making clear that there are idle tests but I didn't see an obvious way to improve them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4560: Flink 7077
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4560#discussion_r134003593 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -960,12 +965,26 @@ public void handleError(final Exception exception) { @Override public void releaseResource(InstanceID instanceId) { - stopWorker(instanceId); + runAsync(new Runnable() { + @Override + public void run() { + for (Map.Entryentry : taskExecutors.entrySet()) { + if (entry.getValue().getInstanceID().equals(instanceId)) { + stopWorker(entry.getKey()); --- End diff -- I was thinking along those same lines - the slot manager deals with `InstanceID` mostly, and its log lines are tough to correlate with lower-level resource manager information that is `ResourceID` based. It would be a nice improvement to make `InstanceID` a composite key that included `ResourceID`. Regardless, I think the `stopWorker` method should use `ResourceID`, because `InstanceID` isn't a concept exposed to the RM subclasses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code
[ https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133184#comment-16133184 ] ASF GitHub Bot commented on FLINK-7442: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4564 Thanks, this looks better to me! I would suggest to not pass the configuration into the library cache manager. I think passing configuration objects into specialized / dedicated components is an anti-pattern. It makes testing complicated, signatures inexplicit, etc. For the second task of the classloader, loading resources: Do we have a test that validates the resource resolution? I think this passes the tests of the previous pull request because it behaves for resources like a *parent-first classloader*, which seems inconsistent. Admittedly the existing implementation of the child-first classloader was for resources a child-only classloader, which was also not correct. > Add option for using a child-first classloader for loading user code > > > Key: FLINK-7442 > URL: https://issues.apache.org/jira/browse/FLINK-7442 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4564: [FLINK-7442] Add option for using child-first classloader...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4564 Thanks, this looks better to me! I would suggest to not pass the configuration into the library cache manager. I think passing configuration objects into specialized / dedicated components is an anti-pattern. It makes testing complicated, signatures inexplicit, etc. For the second task of the classloader, loading resources: Do we have a test that validates the resource resolution? I think this passes the tests of the previous pull request because it behaves for resources like a *parent-first classloader*, which seems inconsistent. Admittedly the existing implementation of the child-first classloader was for resources a child-only classloader, which was also not correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available
[ https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133174#comment-16133174 ] ASF GitHub Bot commented on FLINK-7477: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4566 @rmetzger I addressed your comments. > Use "hadoop classpath" to augment classpath when available > -- > > Key: FLINK-7477 > URL: https://issues.apache.org/jira/browse/FLINK-7477 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Currently, some cloud environments don't properly put the Hadoop jars into > {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should > check in {{config.sh}} if the {{hadoop}} binary is on the path and augment > our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in > our scripts. > This will improve the out-of-box experience of users that otherwise have to > manually set {{HADOOP_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink s...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4566 @rmetzger I addressed your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat
[ https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133142#comment-16133142 ] ASF GitHub Bot commented on FLINK-7423: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4525 I think `null` values are not really permitted at the moment, but I think the InputFormats do not explicitly forbid them. That's why the logic is not "run until returns null", but "run until reachedEnd()". The change here implements a mixed contract like "run until reachedEnd() or returns null", which is more complicated to document, enforce, test, and understand for users. > Always reuse an instance to get elements from the inputFormat > --- > > Key: FLINK-7423 > URL: https://issues.apache.org/jira/browse/FLINK-7423 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xu Pingyong >Assignee: Xu Pingyong > > In InputFormatSourceFunction.java: > {code:java} > OUT nextElement = serializer.createInstance(); > while (isRunning) { > format.open(splitIterator.next()); > // for each element we also check if cancel > // was called by checking the isRunning flag > while (isRunning && !format.reachedEnd()) { > nextElement = > format.nextRecord(nextElement); > if (nextElement != null) { > ctx.collect(nextElement); > } else { > break; > } > } > format.close(); > completedSplitsCounter.inc(); > if (isRunning) { > isRunning = splitIterator.hasNext(); > } > } > {code} > the format may return other element or null when nextRecord, that will may > cause exception. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4525 I think `null` values are not really permitted at the moment, but I think the InputFormats do not explicitly forbid them. That's why the logic is not "run until returns null", but "run until reachedEnd()". The change here implements a mixed contract like "run until reachedEnd() or returns null", which is more complicated to document, enforce, test, and understand for users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available
[ https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133120#comment-16133120 ] ASF GitHub Bot commented on FLINK-7477: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/4566#discussion_r133985865 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -351,8 +351,20 @@ if [ -z "$HADOOP_CONF_DIR" ]; then fi fi +# try and set HADOOP_CONF_DIR to some common default if it's not set +if [ -z "$HADOOP_CONF_DIR" ]; then +if [ -d "/etc/hadoop/conf" ]; then +HADOOP_CONF_DIR="/etc/hadoop/conf" +fi +fi + INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}" +# check if the "hadoop" binary is available, if yes, use that to augment the CLASSPATH +if command -v hadoop >/dev/null 2>&1; then + INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}:`hadoop classpath`" --- End diff -- I would actually append `INTERNAL_HADOOP_CLASSPATHS` instead of overwriting it. > Use "hadoop classpath" to augment classpath when available > -- > > Key: FLINK-7477 > URL: https://issues.apache.org/jira/browse/FLINK-7477 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Currently, some cloud environments don't properly put the Hadoop jars into > {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should > check in {{config.sh}} if the {{hadoop}} binary is on the path and augment > our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in > our scripts. > This will improve the out-of-box experience of users that otherwise have to > manually set {{HADOOP_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4566: [FLINK-7477] [FLINK-7480] Various improvements to ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/4566#discussion_r133985865 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -351,8 +351,20 @@ if [ -z "$HADOOP_CONF_DIR" ]; then fi fi +# try and set HADOOP_CONF_DIR to some common default if it's not set +if [ -z "$HADOOP_CONF_DIR" ]; then +if [ -d "/etc/hadoop/conf" ]; then +HADOOP_CONF_DIR="/etc/hadoop/conf" +fi +fi + INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}" +# check if the "hadoop" binary is available, if yes, use that to augment the CLASSPATH +if command -v hadoop >/dev/null 2>&1; then + INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}:`hadoop classpath`" --- End diff -- I would actually append `INTERNAL_HADOOP_CLASSPATHS` instead of overwriting it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4566: [FLINK-7477] [FLINK-7480] Various improvements to ...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/4566#discussion_r133985612 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -351,8 +351,20 @@ if [ -z "$HADOOP_CONF_DIR" ]; then fi fi +# try and set HADOOP_CONF_DIR to some common default if it's not set +if [ -z "$HADOOP_CONF_DIR" ]; then +if [ -d "/etc/hadoop/conf" ]; then +HADOOP_CONF_DIR="/etc/hadoop/conf" --- End diff -- I would suggest to print a message to the user that we are using this HADOOP_CONF_DIR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available
[ https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133119#comment-16133119 ] ASF GitHub Bot commented on FLINK-7477: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/4566#discussion_r133985612 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -351,8 +351,20 @@ if [ -z "$HADOOP_CONF_DIR" ]; then fi fi +# try and set HADOOP_CONF_DIR to some common default if it's not set +if [ -z "$HADOOP_CONF_DIR" ]; then +if [ -d "/etc/hadoop/conf" ]; then +HADOOP_CONF_DIR="/etc/hadoop/conf" --- End diff -- I would suggest to print a message to the user that we are using this HADOOP_CONF_DIR > Use "hadoop classpath" to augment classpath when available > -- > > Key: FLINK-7477 > URL: https://issues.apache.org/jira/browse/FLINK-7477 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Currently, some cloud environments don't properly put the Hadoop jars into > {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should > check in {{config.sh}} if the {{hadoop}} binary is on the path and augment > our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in > our scripts. > This will improve the out-of-box experience of users that otherwise have to > manually set {{HADOOP_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7480) Set HADOOP_CONF_DIR to sane default if not set
[ https://issues.apache.org/jira/browse/FLINK-7480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133118#comment-16133118 ] Robert Metzger commented on FLINK-7480: --- +1 > Set HADOOP_CONF_DIR to sane default if not set > -- > > Key: FLINK-7480 > URL: https://issues.apache.org/jira/browse/FLINK-7480 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Currently, both AWS and GCE don't have a {{HADOOP_CONF_DIR}} set by default. > This makes the out-of-box experience on these cloud environments bad because > not setting it results in errors that are not obviously clear. > In case {{HADOOP_CONF_DIR}} is not set we should check if > {{/etc/hadoop/conf}} exits and set {{HADOOP_CONF_DIR}} to that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4560: Flink 7077
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4560#discussion_r133980216 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -960,12 +965,26 @@ public void handleError(final Exception exception) { @Override public void releaseResource(InstanceID instanceId) { - stopWorker(instanceId); + runAsync(new Runnable() { + @Override + public void run() { + for (Map.Entryentry : taskExecutors.entrySet()) { + if (entry.getValue().getInstanceID().equals(instanceId)) { + stopWorker(entry.getKey()); --- End diff -- In the future we should make these ids being composed of each other. Then we should easily obtain the `ResourceID` from the `InstanceID` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4560: Flink 7077
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4560#discussion_r133980386 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java --- @@ -68,7 +68,9 @@ public boolean isIdle() { } public void markIdle() { - idleSince = System.currentTimeMillis(); + if (!isIdle()) { + idleSince = System.currentTimeMillis(); + } --- End diff -- good catch! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()
[ https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133081#comment-16133081 ] ASF GitHub Bot commented on FLINK-7402: --- Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4562#discussion_r133980441 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -220,6 +220,10 @@ void releaseBuffer() { @Override ByteBuf write(ByteBufAllocator allocator) throws IOException { + if (null == buffer) { + throw new NullPointerException(); --- End diff -- Add exception message to provide more information. > Ineffective null check in NettyMessage#write() > -- > > Key: FLINK-7402 > URL: https://issues.apache.org/jira/browse/FLINK-7402 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > Here is the null check in finally block: > {code} > finally { > if (buffer != null) { > buffer.recycle(); > } > {code} > But buffer has been dereferenced in the try block without guard. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4562#discussion_r133980441 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -220,6 +220,10 @@ void releaseBuffer() { @Override ByteBuf write(ByteBufAllocator allocator) throws IOException { + if (null == buffer) { + throw new NullPointerException(); --- End diff -- Add exception message to provide more information. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133058#comment-16133058 ] ASF GitHub Bot commented on FLINK-6630: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133975100 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; + +/** + * Entry point for Mesos per-job clusters. + */ +public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { + + public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + + // + // Command-line options + // + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + private MesosConfiguration schedulerConfiguration; + + private MesosServices mesosServices; + + private MesosTaskManagerParameters taskManagerParameters; + + private ContainerSpecification taskManagerContainerSpec; + + public MesosJobClusterEntrypoint(Configuration config) { + super(config); + } + + @Override + protected void initializeServices(Configuration config) throws Exception { + super.initializeServices(config); + + final String hostname = config.getString(JobManagerOptions.ADDRESS); + + // Mesos configuration + schedulerConfiguration =
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133057#comment-16133057 ] ASF GitHub Bot commented on FLINK-6630: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133975661 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * The entry point for running a TaskManager in a Mesos container. + */ +public class MesosTaskExecutorRunner { + + private static final Logger LOG = LoggerFactory.getLogger(MesosTaskExecutorRunner.class); + + private static final int INIT_ERROR_EXIT_CODE = 31; + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + /** The process environment variables. */ + private static final MapENV = System.getenv(); + + public static void main(String[] args) throws Exception { + EnvironmentInformation.logEnvironmentInfo(LOG, MesosTaskExecutorRunner.class.getSimpleName(), args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + // try to parse the command line arguments + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(ALL_OPTIONS, args); + + final Configuration configuration; + try { + final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + + configuration = GlobalConfiguration.loadConfiguration(); --- End diff -- Here I think we could use `MesosEntrypointUtils#loadConfiguration(cmd)`. > Implement FLIP-6 MesosAppMasterRunner > - > > Key: FLINK-6630 > URL: https://issues.apache.org/jira/browse/FLINK-6630 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > A new runner must be developed for the FLIP-6 RM. Target the "single job" > scenario. > Take some time to consider a general solution or a base implementation that > is shared with the old
[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133974899 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; + +/** + * Entry point for Mesos per-job clusters. + */ +public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { + + public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + + // + // Command-line options + // + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + private MesosConfiguration schedulerConfiguration; + + private MesosServices mesosServices; + + private MesosTaskManagerParameters taskManagerParameters; + + private ContainerSpecification taskManagerContainerSpec; + + public MesosJobClusterEntrypoint(Configuration config) { + super(config); + } + + @Override + protected void initializeServices(Configuration config) throws Exception { + super.initializeServices(config); + + final String hostname = config.getString(JobManagerOptions.ADDRESS); + + // Mesos configuration + schedulerConfiguration = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname); + + // services + mesosServices = MesosServicesUtils.createMesosServices(config, hostname); + + // TM configuration +
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133056#comment-16133056 ] ASF GitHub Bot commented on FLINK-6630: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133974899 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; + +/** + * Entry point for Mesos per-job clusters. + */ +public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { + + public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + + // + // Command-line options + // + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + private MesosConfiguration schedulerConfiguration; + + private MesosServices mesosServices; + + private MesosTaskManagerParameters taskManagerParameters; + + private ContainerSpecification taskManagerContainerSpec; + + public MesosJobClusterEntrypoint(Configuration config) { + super(config); + } + + @Override + protected void initializeServices(Configuration config) throws Exception { + super.initializeServices(config); + + final String hostname = config.getString(JobManagerOptions.ADDRESS); + + // Mesos configuration + schedulerConfiguration =
[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133974223 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.configuration.MesosOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; +import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay; +import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay; +import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay; + +import org.apache.commons.cli.CommandLine; +import org.apache.mesos.Protos; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +/** + * Utils for Mesos entrpoints. + */ +public class MesosEntrypointUtils { + + @Deprecated + public static Configuration loadConfiguration(CommandLine cmd) { + + // merge the dynamic properties from the command-line + Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); + Configuration config = GlobalConfiguration.loadConfiguration(); + + return config; + } + + /** +* Loads and validates the Mesos scheduler configuration. +* @param flinkConfig the global configuration. +* @param hostname the hostname to advertise to the Mesos master. +*/ + public static MesosConfiguration createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) { + + Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() + .setHostname(hostname); + Protos.Credential.Builder credential = null; + + if (!flinkConfig.contains(MesosOptions.MASTER_URL)) { + throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured."); + } + String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL); + + Duration failoverTimeout = FiniteDuration.apply( + flinkConfig.getInteger( + MesosOptions.FAILOVER_TIMEOUT_SECONDS), + TimeUnit.SECONDS); + frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds()); + + frameworkInfo.setName(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME)); + + frameworkInfo.setRole(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE)); + + frameworkInfo.setUser(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER)); + + if (flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) { +
[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133975661 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * The entry point for running a TaskManager in a Mesos container. + */ +public class MesosTaskExecutorRunner { + + private static final Logger LOG = LoggerFactory.getLogger(MesosTaskExecutorRunner.class); + + private static final int INIT_ERROR_EXIT_CODE = 31; + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + /** The process environment variables. */ + private static final MapENV = System.getenv(); + + public static void main(String[] args) throws Exception { + EnvironmentInformation.logEnvironmentInfo(LOG, MesosTaskExecutorRunner.class.getSimpleName(), args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + // try to parse the command line arguments + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(ALL_OPTIONS, args); + + final Configuration configuration; + try { + final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + + configuration = GlobalConfiguration.loadConfiguration(); --- End diff -- Here I think we could use `MesosEntrypointUtils#loadConfiguration(cmd)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133055#comment-16133055 ] ASF GitHub Bot commented on FLINK-6630: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133974223 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.configuration.MesosOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; +import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay; +import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay; +import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay; + +import org.apache.commons.cli.CommandLine; +import org.apache.mesos.Protos; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +/** + * Utils for Mesos entrpoints. + */ +public class MesosEntrypointUtils { + + @Deprecated + public static Configuration loadConfiguration(CommandLine cmd) { + + // merge the dynamic properties from the command-line + Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); + Configuration config = GlobalConfiguration.loadConfiguration(); + + return config; + } + + /** +* Loads and validates the Mesos scheduler configuration. +* @param flinkConfig the global configuration. +* @param hostname the hostname to advertise to the Mesos master. +*/ + public static MesosConfiguration createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) { + + Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() + .setHostname(hostname); + Protos.Credential.Builder credential = null; + + if (!flinkConfig.contains(MesosOptions.MASTER_URL)) { + throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured."); + } + String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL); + + Duration failoverTimeout = FiniteDuration.apply( + flinkConfig.getInteger( + MesosOptions.FAILOVER_TIMEOUT_SECONDS), + TimeUnit.SECONDS); + frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds()); + + frameworkInfo.setName(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME)); + + frameworkInfo.setRole(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE)); + + frameworkInfo.setUser(flinkConfig.getString(
[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133975100 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; + +/** + * Entry point for Mesos per-job clusters. + */ +public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { + + public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + + // + // Command-line options + // + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + private MesosConfiguration schedulerConfiguration; + + private MesosServices mesosServices; + + private MesosTaskManagerParameters taskManagerParameters; + + private ContainerSpecification taskManagerContainerSpec; + + public MesosJobClusterEntrypoint(Configuration config) { + super(config); + } + + @Override + protected void initializeServices(Configuration config) throws Exception { + super.initializeServices(config); + + final String hostname = config.getString(JobManagerOptions.ADDRESS); + + // Mesos configuration + schedulerConfiguration = MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname); + + // services + mesosServices = MesosServicesUtils.createMesosServices(config, hostname); + + // TM configuration +
[jira] [Commented] (FLINK-7300) End-to-end tests are instable on Travis
[ https://issues.apache.org/jira/browse/FLINK-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133053#comment-16133053 ] Aljoscha Krettek commented on FLINK-7300: - The reason this time is an {{AskTimeoutException}}. The problem is that even a normal run of Flink can have exceptions and errors in the log. In our release testing we have this section about "running a cluster and verifying that the log and output are clear of exceptions and errors". I think in the real world the log is never clear of exceptions and errors, even in the case where everything wen't well. [~till.rohrmann] You think we should maybe just not test for the log being clean? I could also add {{AskTimeoutException}} to the list of exceptions that we expect to occur. I'm guessing this just sometimes occurs with Akka? > End-to-end tests are instable on Travis > --- > > Key: FLINK-7300 > URL: https://issues.apache.org/jira/browse/FLINK-7300 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek > Labels: test-stability > Fix For: 1.4.0 > > > It seems like the end-to-end tests are instable, causing the {{misc}} build > profile to sporadically fail. > Incorrect matched output: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/258569408/log.txt?X-Amz-Expires=30=20170731T060526Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4ef9ff5e60fe06db53a84be8d73775a46cb595a8caeb806b05dbbf824d3b69e8 > Another failure example of a different cause then the above, also on the > end-to-end tests: > https://s3.amazonaws.com/archive.travis-ci.org/jobs/258841693/log.txt?X-Amz-Expires=30=20170731T060007Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4a106b3990228b7628c250cc15407bc2c131c8332e1a94ad68d649fe8d32d726 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133050#comment-16133050 ] Xingcan Cui commented on FLINK-7446: Thanks for the response, [~fhueske]. From my perspective, I prefer the later option (create new interfaces/operators...). Maybe we can provide one or more default watermark generators. Users can directly set them by providing some parameters (e.g., the watermark interval and the expected delay to the latest rowtime). Moreover, if the provided watermark generators can not meet the requirements, users can implement their own ones. What do you think, [~jark]? > Support to define an existing field as the rowtime field for TableSource > > > Key: FLINK-7446 > URL: https://issues.apache.org/jira/browse/FLINK-7446 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field > for a {{TableSource}}. But it would be helpful if we can support to define an > existing field as the rowtime field. Just like registering a DataStream, the > rowtime field can be appended but also can replace an existing field. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available
[ https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133044#comment-16133044 ] ASF GitHub Bot commented on FLINK-7477: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4566 [FLINK-7477] [FLINK-7480] Various improvements to Flink scripts You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink hadoop-env-improvements Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4566.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4566 commit 6b4d7e5e09dcd913fbb9c84c59fc8a10e6c662cc Author: Aljoscha KrettekDate: 2017-08-18T14:39:41Z [FLINK-7477] Use "hadoop classpath" to augment classpath when available This improves the out-of-box experience on GCE and AWS, both of which don't set a HADOOP_CLASSPATH but have "hadoop" available on the $PATH. commit f63e2d03d739014f0cd94634d731e552a02c76d9 Author: Aljoscha Krettek Date: 2017-08-18T14:40:55Z [FLINK-7480] Set HADOOP_CONF_DIR to sane default if not set This improves the out-of-box experience on GCE and AWS, both of which don't set HADOOP_CONF_DIR by default but use /etc/hadoop/conf > Use "hadoop classpath" to augment classpath when available > -- > > Key: FLINK-7477 > URL: https://issues.apache.org/jira/browse/FLINK-7477 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Currently, some cloud environments don't properly put the Hadoop jars into > {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should > check in {{config.sh}} if the {{hadoop}} binary is on the path and augment > our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in > our scripts. > This will improve the out-of-box experience of users that otherwise have to > manually set {{HADOOP_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4566: [FLINK-7477] [FLINK-7480] Various improvements to ...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4566 [FLINK-7477] [FLINK-7480] Various improvements to Flink scripts You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink hadoop-env-improvements Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4566.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4566 commit 6b4d7e5e09dcd913fbb9c84c59fc8a10e6c662cc Author: Aljoscha KrettekDate: 2017-08-18T14:39:41Z [FLINK-7477] Use "hadoop classpath" to augment classpath when available This improves the out-of-box experience on GCE and AWS, both of which don't set a HADOOP_CLASSPATH but have "hadoop" available on the $PATH. commit f63e2d03d739014f0cd94634d731e552a02c76d9 Author: Aljoscha Krettek Date: 2017-08-18T14:40:55Z [FLINK-7480] Set HADOOP_CONF_DIR to sane default if not set This improves the out-of-box experience on GCE and AWS, both of which don't set HADOOP_CONF_DIR by default but use /etc/hadoop/conf --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7477) Use "hadoop classpath" to augment classpath when available
[ https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7477: Fix Version/s: 1.4.0 > Use "hadoop classpath" to augment classpath when available > -- > > Key: FLINK-7477 > URL: https://issues.apache.org/jira/browse/FLINK-7477 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Currently, some cloud environments don't properly put the Hadoop jars into > {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should > check in {{config.sh}} if the {{hadoop}} binary is on the path and augment > our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in > our scripts. > This will improve the out-of-box experience of users that otherwise have to > manually set {{HADOOP_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7480) Set HADOOP_CONF_DIR to sane default if not set
Aljoscha Krettek created FLINK-7480: --- Summary: Set HADOOP_CONF_DIR to sane default if not set Key: FLINK-7480 URL: https://issues.apache.org/jira/browse/FLINK-7480 Project: Flink Issue Type: Improvement Components: Startup Shell Scripts Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Fix For: 1.4.0 Currently, both AWS and GCE don't have a {{HADOOP_CONF_DIR}} set by default. This makes the out-of-box experience on these cloud environments bad because not setting it results in errors that are not obviously clear. In case {{HADOOP_CONF_DIR}} is not set we should check if {{/etc/hadoop/conf}} exits and set {{HADOOP_CONF_DIR}} to that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7471) Improve bounded OVER support non-retract method AGG
[ https://issues.apache.org/jira/browse/FLINK-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133031#comment-16133031 ] Jark Wu commented on FLINK-7471: Hi [~fhueske], I think [~sunjincheng121] mean that retraction for over aggregates is an optimization in most cases but not all cases. For example, min/max with retraction needs to store all records of the group in the min/max's accumulator. It's hard to say the overload of retraction mode (store all records) is certainly less than non-retraction mode (compute all records of a group). That's why the over window can support non-retract AGG. The over window mode (retract/non-retract) is depends on whether all the aggregates implement the retract methods. And the retract method is not mandatory anymore for over window. > Improve bounded OVER support non-retract method AGG > --- > > Key: FLINK-7471 > URL: https://issues.apache.org/jira/browse/FLINK-7471 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently BOUNDED OVER WINDOW only support have {{retract}} method AGG. In > this JIRA. will add non-retract method support. > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 Thanks for the comment, @aljoscha. IMO, making the `timeServiceManager` protected indeed will minimise the impact on `AbstractStreamOperator`, while that may introduce duplicated codes in the subclasses. We make some trade-offs here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133027#comment-16133027 ] ASF GitHub Bot commented on FLINK-7245: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/4530 Thanks for the comment, @aljoscha. IMO, making the `timeServiceManager` protected indeed will minimise the impact on `AbstractStreamOperator`, while that may introduce duplicated codes in the subclasses. We make some trade-offs here. > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7479) Support to retrieve the past event by physical offset
[ https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-7479: --- Summary: Support to retrieve the past event by physical offset (was: Support to retrieve the past event by an offset ) > Support to retrieve the past event by physical offset > -- > > Key: FLINK-7479 > URL: https://issues.apache.org/jira/browse/FLINK-7479 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, it's already able to retrieve events matched to the specifed > pattern in {{IterativeCondition.Context}}. While there are also requirements > to retrieve events by an physical offset. The retrieved events may not be > matched to any pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7479) Support to retrieve the past event by an offset
[ https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133007#comment-16133007 ] Dian Fu commented on FLINK-7479: This is to support the {{PREV}} clause in {{MATCH_RECOGNIZE}}. As described in the [doc|https://docs.oracle.com/database/121/DWHSG/pattern.htm#DWHSG8996], {{PREV}} defines an expression using a previous row by physical offset. This means that in the {{filter}} method of {{IterativeCondition}}, we may also need to access the past events by physical offset and the accessed events may be not matched by any pattern. > Support to retrieve the past event by an offset > > > Key: FLINK-7479 > URL: https://issues.apache.org/jira/browse/FLINK-7479 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, it's already able to retrieve events matched to the specifed > pattern in {{IterativeCondition.Context}}. While there are also requirements > to retrieve events by an physical offset. The retrieved events may not be > matched to any pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4553: [FLINK-7642] [docs] Add very obvious warning about outdat...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/4553 Afaik the problem is that the doc builders are independent of Flink version. So we use the same toolchain for building 0.8 docs and 1.4 docs. The `flink.conf` file in https://svn.apache.org/repos/infra/infrastructure/buildbot/aegis/buildmaster/master1/projects contains the builder config. I've introduced @twalthr into the ugly details of it a few weeks ago, maybe he can look into it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7429) Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer
[ https://issues.apache.org/jira/browse/FLINK-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132182#comment-16132182 ] ASF GitHub Bot commented on FLINK-7429: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4565 [FLINK-7429] [kinesis] Add migration test coverage for Flink 1.2 and 1.3 ## What is the purpose of the change The migration tests for the Kinesis consumer did not cover Flink 1.2 and 1.3. This pull request fixes that. It also fixes a minor bug, where restoring from empty state had different behaviours across different Flink versions, that was discovered as part of additionally having these new migration tests. ## Brief change log - (1st commit) generalize `FlinkKinesisConsumerMigrationTest` to cover all Flink versions & add stored savepoints for Flink 1.2 and 1.3. - (2nd commit) Fix different restore behaviour when restoring empty state. ## Verifying this change Previous behaviours should be covered by existing tests. The additional tests in `FlinkKinesisConsumerMigrationTest` are simply an extra guard that we were doing things correctly. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7429 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4565.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4565 commit 04e73db786eb2cc7129ea6a8fbe05ca156ef3bce Author: Tzu-Li (Gordon) TaiDate: 2017-08-18T03:27:38Z [FLINK-7429] [kinesis] Add IT tests for migration from 1.2 / 1.3 commit 47fbdf7495a95682a4da69a5e9a060cdf4c496b3 Author: Tzu-Li (Gordon) Tai Date: 2017-08-18T13:17:27Z [FLINK-7429] [kinesis] Unify empty state restore behaviour across 1.1 / 1.2 / 1.3 Prior to this commit, when restoring empty state from previous Flink versions, the behaviour was different for each version. For older versions, restoring empty state results in `null`. For newer versions, restoring empty state results in an empty map. We want that an empty map represents "this is a restored run, but there was no state for us", and a null to represent" this is not a restored run". > Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer > --- > > Key: FLINK-7429 > URL: https://issues.apache.org/jira/browse/FLINK-7429 > Project: Flink > Issue Type: Test > Components: Kinesis Connector, Tests >Affects Versions: 1.2.1, 1.4.0, 1.3.2 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Currently, the `FlinkKinesisConsumerMigrationTest` only tests restore from > Flink 1.1. > We should extend that to also verify restoring from 1.2 and 1.3. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4565: [FLINK-7429] [kinesis] Add migration test coverage...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4565 [FLINK-7429] [kinesis] Add migration test coverage for Flink 1.2 and 1.3 ## What is the purpose of the change The migration tests for the Kinesis consumer did not cover Flink 1.2 and 1.3. This pull request fixes that. It also fixes a minor bug, where restoring from empty state had different behaviours across different Flink versions, that was discovered as part of additionally having these new migration tests. ## Brief change log - (1st commit) generalize `FlinkKinesisConsumerMigrationTest` to cover all Flink versions & add stored savepoints for Flink 1.2 and 1.3. - (2nd commit) Fix different restore behaviour when restoring empty state. ## Verifying this change Previous behaviours should be covered by existing tests. The additional tests in `FlinkKinesisConsumerMigrationTest` are simply an extra guard that we were doing things correctly. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-7429 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4565.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4565 commit 04e73db786eb2cc7129ea6a8fbe05ca156ef3bce Author: Tzu-Li (Gordon) TaiDate: 2017-08-18T03:27:38Z [FLINK-7429] [kinesis] Add IT tests for migration from 1.2 / 1.3 commit 47fbdf7495a95682a4da69a5e9a060cdf4c496b3 Author: Tzu-Li (Gordon) Tai Date: 2017-08-18T13:17:27Z [FLINK-7429] [kinesis] Unify empty state restore behaviour across 1.1 / 1.2 / 1.3 Prior to this commit, when restoring empty state from previous Flink versions, the behaviour was different for each version. For older versions, restoring empty state results in `null`. For newer versions, restoring empty state results in an empty map. We want that an empty map represents "this is a restored run, but there was no state for us", and a null to represent" this is not a restored run". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
[ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132174#comment-16132174 ] ASF GitHub Bot commented on FLINK-7245: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4530 I think the approach is OK, though I personally would have preferred to make the instance variable protected and override `processWatermark()` to minimise impact on `AbstractStreamOperator`, which now has an extra method call. (though that will probably be inlined) > Enhance the operators to support holding back watermarks > > > Key: FLINK-7245 > URL: https://issues.apache.org/jira/browse/FLINK-7245 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Currently the watermarks are applied and emitted by the > {{AbstractStreamOperator}} instantly. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (timeServiceManager != null) { > timeServiceManager.advanceWatermark(mark); > } > output.emitWatermark(mark); > } > {code} > Some calculation results (with timestamp fields) triggered by these > watermarks (e.g., join or aggregate results) may be regarded as delayed by > the downstream operators since their timestamps must be less than or equal to > the corresponding triggers. > This issue aims to add another "working mode", which supports holding back > watermarks, to current operators. These watermarks should be blocked and > stored by the operators until all the corresponding new generated results are > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4530 I think the approach is OK, though I personally would have preferred to make the instance variable protected and override `processWatermark()` to minimise impact on `AbstractStreamOperator`, which now has an extra method call. (though that will probably be inlined) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code
[ https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132172#comment-16132172 ] ASF GitHub Bot commented on FLINK-7442: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4554 Yes, I created an alternative PR for that: https://github.com/apache/flink/pull/4564 My only worry is that all the tests in that PR would also pass with the other child-first class loader implementation from the RocksDB test, meaning that we don't actually have coverage for something that I discovered by having the class loader in the client. If we're fine with that I will close this PR and merge the other one once it's reviewed. > Add option for using a child-first classloader for loading user code > > > Key: FLINK-7442 > URL: https://issues.apache.org/jira/browse/FLINK-7442 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4554 Yes, I created an alternative PR for that: https://github.com/apache/flink/pull/4564 My only worry is that all the tests in that PR would also pass with the other child-first class loader implementation from the RocksDB test, meaning that we don't actually have coverage for something that I discovered by having the class loader in the client. If we're fine with that I will close this PR and merge the other one once it's reviewed. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code
[ https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132168#comment-16132168 ] ASF GitHub Bot commented on FLINK-7442: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4564 [FLINK-7442] Add option for using child-first classloader for loading user code This is an alternative to #4554 that does not make the client class loader configurable. ## What is the purpose of the change This PR introduces a new core option (`classloader.resolve-order: child-first`) that allows using a child-first class loader for user code. The default is still to use a parent-first class loader. This also does a minor refactoring in the way the blob manager retrieves the cleanup interval. It's now also read from the `Configuration`, since we already have the `Configuration` for the class loader settings. ## Brief change log - Introduce new option - Pass `Configuration` thought to all places where we previously created a user class loader - Instantiate correct class loader based on config ## Verifying this change This PR introduces new end-to-end tests that verify the new feature in a complete Flink workflow, including starting the program using `bin/flink run`. ## Does this pull request potentially affect one of the following parts: This affects class loader, which is quite important to get right. ## Documentation - the new flag is documented in the config documentation You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7441-child-first-classloader-alternative Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4564.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4564 commit 33bd4cf5160ec64cbaace876b305922b804cc3a1 Author: Aljoscha KrettekDate: 2017-08-14T12:53:14Z [FLINK-7442] Add option for using a child-first classloader for loading user code > Add option for using a child-first classloader for loading user code > > > Key: FLINK-7442 > URL: https://issues.apache.org/jira/browse/FLINK-7442 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4564: [FLINK-7442] Add option for using child-first clas...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4564 [FLINK-7442] Add option for using child-first classloader for loading user code This is an alternative to #4554 that does not make the client class loader configurable. ## What is the purpose of the change This PR introduces a new core option (`classloader.resolve-order: child-first`) that allows using a child-first class loader for user code. The default is still to use a parent-first class loader. This also does a minor refactoring in the way the blob manager retrieves the cleanup interval. It's now also read from the `Configuration`, since we already have the `Configuration` for the class loader settings. ## Brief change log - Introduce new option - Pass `Configuration` thought to all places where we previously created a user class loader - Instantiate correct class loader based on config ## Verifying this change This PR introduces new end-to-end tests that verify the new feature in a complete Flink workflow, including starting the program using `bin/flink run`. ## Does this pull request potentially affect one of the following parts: This affects class loader, which is quite important to get right. ## Documentation - the new flag is documented in the config documentation You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7441-child-first-classloader-alternative Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4564.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4564 commit 33bd4cf5160ec64cbaace876b305922b804cc3a1 Author: Aljoscha KrettekDate: 2017-08-14T12:53:14Z [FLINK-7442] Add option for using a child-first classloader for loading user code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7479) Support to retrieve the past event by an offset
[ https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132141#comment-16132141 ] ASF GitHub Bot commented on FLINK-7479: --- GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4563 [FLINK-7479] [cep] Support to retrieve the past event by an offset ## What is the purpose of the change *Currently, it's already able to retrieve events matched to the specifed pattern in IterativeCondition.Context. While there are also requirements to retrieve events by an physical offset. The retrieved events may not be matched to any pattern.* ## Brief change log - *Add API retain() in Pattern* - *Buffer the past events in NFA.process* - *Access the past events by the newly added API getEventByOffset in IterativeCondition.Context* ## Verifying this change This change added tests and can be verified as follows: - *Added test in IterativeConditionsITCase* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink FLINK-7479 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4563.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4563 commit e9feb55e6c2f7d32ec6266049be0ea9bbff967b2 Author: Dian FuDate: 2017-08-18T12:50:25Z [FLINK-7479] [cep] Support to retrieve the past event by an offset > Support to retrieve the past event by an offset > > > Key: FLINK-7479 > URL: https://issues.apache.org/jira/browse/FLINK-7479 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > > Currently, it's already able to retrieve events matched to the specifed > pattern in {{IterativeCondition.Context}}. While there are also requirements > to retrieve events by an physical offset. The retrieved events may not be > matched to any pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4563: [FLINK-7479] [cep] Support to retrieve the past ev...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4563 [FLINK-7479] [cep] Support to retrieve the past event by an offset ## What is the purpose of the change *Currently, it's already able to retrieve events matched to the specifed pattern in IterativeCondition.Context. While there are also requirements to retrieve events by an physical offset. The retrieved events may not be matched to any pattern.* ## Brief change log - *Add API retain() in Pattern* - *Buffer the past events in NFA.process* - *Access the past events by the newly added API getEventByOffset in IterativeCondition.Context* ## Verifying this change This change added tests and can be verified as follows: - *Added test in IterativeConditionsITCase* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs / JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink FLINK-7479 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4563.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4563 commit e9feb55e6c2f7d32ec6266049be0ea9bbff967b2 Author: Dian FuDate: 2017-08-18T12:50:25Z [FLINK-7479] [cep] Support to retrieve the past event by an offset --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7479) Support to retrieve the past event by an offset
Dian Fu created FLINK-7479: -- Summary: Support to retrieve the past event by an offset Key: FLINK-7479 URL: https://issues.apache.org/jira/browse/FLINK-7479 Project: Flink Issue Type: Sub-task Reporter: Dian Fu Assignee: Dian Fu Currently, it's already able to retrieve events matched to the specifed pattern in {{IterativeCondition.Context}}. While there are also requirements to retrieve events by an physical offset. The retrieved events may not be matched to any pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7473) Possible Leak in GlobalWindows
[ https://issues.apache.org/jira/browse/FLINK-7473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132125#comment-16132125 ] Nico Kruber commented on FLINK-7473: Another fix: properly fix your event time use (which is the reason for max int in your case): [Flink 1.3 Docs: Event Time|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html] states: "... the program needs to either use sources that directly define event time for the data and emit watermarks themselves, or the program must inject a Timestamp Assigner & Watermark Generator after the sources." Various sub-topics go into more details, e.g. [Flink 1.3 Docs: Generating Timestamps / Watermarks|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html]. > Possible Leak in GlobalWindows > -- > > Key: FLINK-7473 > URL: https://issues.apache.org/jira/browse/FLINK-7473 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.2 > Environment: See attached project >Reporter: Steve Jerman > Attachments: timerIssue.zip > > > Hi, > I have been wrestling with a issue with GlobalWindows. It seems like it leaks > instances of InternalTimer. > I can't tell if it's a bug or my code so I created a 'minimal' project that > has the issue... > If you run the Unit Test in the attached and then monitor heap you will see > that the number of InternalTimers continually increases. I added code to > explicitly delete them.. doesn't seem to help. > If I comment out registerEventTimeTimer ... no leak :) > My suspicion is that PURGE/FIRE_AND_PURGE is leaving the timer in limbo. > Steve -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7129) Dynamically changing patterns
[ https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132111#comment-16132111 ] Dawid Wysakowicz commented on FLINK-7129: - Hi [~dian.fu] Yes the idea was to enable multiple patterns in a single stream with option 2. Unfortunately while implementing it, I stumbled across a problem that right now it is impossible to distribute a Pattern into all instances of operators with all keys. Also there might be a problem that some keys were not spotted before a new Pattern arrived. We could implement a solution that multiple static Patterns are applied to same stream, but I don't see much advantage over current solution. > Dynamically changing patterns > - > > Key: FLINK-7129 > URL: https://issues.apache.org/jira/browse/FLINK-7129 > Project: Flink > Issue Type: New Feature > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > > An umbrella task for introducing mechanism for injecting patterns through > coStream -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7471) Improve bounded OVER support non-retract method AGG
[ https://issues.apache.org/jira/browse/FLINK-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132090#comment-16132090 ] sunjincheng commented on FLINK-7471: We know it's poor compute if all aggregates for each record completely from scratch performance. that's why we using {{retract}} method to implement BOUNDED OVER WINDOW. I think that's the best choice. We also have add the comment in {{AggregateFunction}} for {{retract}} method. {code} ... This(retract) function must be implemented for datastream bounded over aggregate. {code} So, I think we did the right thing when we do the aggregate design and implementation of the bounded over. But I think using the {{retract}} method is an optimized implementation of bounded over window, but we can not force the user to implement the {{retract}} method. especially the business scenario is an append only table that never generates retract information, so the user may define a aggregate function that does not have a retract method. we can not force the user must implement the retract method because of the way of realization for bounded over window. And I think we may not need a lot of refactoring the original code, but on the basis of the original code to increase the non-retract method support. [~fhueske] I think your worry is necessary, but I do not really like the implementation of the OVER window will force the user to implement the AggregateFunction # retract method. > Improve bounded OVER support non-retract method AGG > --- > > Key: FLINK-7471 > URL: https://issues.apache.org/jira/browse/FLINK-7471 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently BOUNDED OVER WINDOW only support have {{retract}} method AGG. In > this JIRA. will add non-retract method support. > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat
[ https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132062#comment-16132062 ] ASF GitHub Bot commented on FLINK-7423: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4525 @StephanEwen why are `null` values permitted if not in the contract of the input formats? > Always reuse an instance to get elements from the inputFormat > --- > > Key: FLINK-7423 > URL: https://issues.apache.org/jira/browse/FLINK-7423 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xu Pingyong >Assignee: Xu Pingyong > > In InputFormatSourceFunction.java: > {code:java} > OUT nextElement = serializer.createInstance(); > while (isRunning) { > format.open(splitIterator.next()); > // for each element we also check if cancel > // was called by checking the isRunning flag > while (isRunning && !format.reachedEnd()) { > nextElement = > format.nextRecord(nextElement); > if (nextElement != null) { > ctx.collect(nextElement); > } else { > break; > } > } > format.close(); > completedSplitsCounter.inc(); > if (isRunning) { > isRunning = splitIterator.hasNext(); > } > } > {code} > the format may return other element or null when nextRecord, that will may > cause exception. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/4525 @StephanEwen why are `null` values permitted if not in the contract of the input formats? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7478) Update documentation for sql insert and api change in TableAPI & SQL
lincoln.lee created FLINK-7478: -- Summary: Update documentation for sql insert and api change in TableAPI & SQL Key: FLINK-7478 URL: https://issues.apache.org/jira/browse/FLINK-7478 Project: Flink Issue Type: New Feature Reporter: lincoln.lee Assignee: lincoln.lee Priority: Minor -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function
[ https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132056#comment-16132056 ] ASF GitHub Bot commented on FLINK-7358: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4534 Hi, @fhueske Thank you for reminding me :-). I updated the description. and feel free to tell me if there is any inappropriate description. Thanks, jincheng > Add implicitly converts support for User-defined function > -- > > Key: FLINK-7358 > URL: https://issues.apache.org/jira/browse/FLINK-7358 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Currently if user defined a UDF as follows: > {code} > object Func extends ScalarFunction { > def eval(a: Int, b: Long): String = { > ... > } > } > {code} > And if the table schema is (a: Int, b: int, c: String), then we can not call > the UDF `Func('a, 'b)`. So > I want add implicitly converts when we call UDF. The implicitly convert rule > is: > BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> > FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO > *Note: > In this JIRA. only for TableAPI, And SQL will be fixed in > https://issues.apache.org/jira/browse/CALCITE-1908.* > What do you think? [~fhueske] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4534: [FLINK-7358][table]Add implicitly converts support for Us...
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/4534 Hi, @fhueske Thank you for reminding me :-). I updated the description. and feel free to tell me if there is any inappropriate description. Thanks, jincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132042#comment-16132042 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on the issue: https://github.com/apache/flink/pull/4355 1. Use HeapMapView/HeapListView as default implementation 2. add initialize and cleanUp interface to GeneratedAggregations > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132041#comment-16132041 ] ASF GitHub Bot commented on FLINK-7068: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4358 Rebased onto `master`, but had to drag in #4402 early to fix the end-to-end tests failing due to spurious warnings. The test failure you observed was actually a test instability introduced with #4238 for which I added a hotfix to this PR. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...
Github user kaibozhou commented on the issue: https://github.com/apache/flink/pull/4355 1. Use HeapMapView/HeapListView as default implementation 2. add initialize and cleanUp interface to GeneratedAggregations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4358 Rebased onto `master`, but had to drag in #4402 early to fix the end-to-end tests failing due to spurious warnings. The test failure you observed was actually a test instability introduced with #4238 for which I added a hotfix to this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r133925176 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -179,13 +214,19 @@ class AggregationCodeGenerator( | ${aggMapping(i)}, | (${accTypes(i)}) accs.getField($i));""".stripMargin } else { +val setDataView = if (accConfig.isDefined && accConfig.get.isUseState) { --- End diff -- Yes, it make sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132016#comment-16132016 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r133925097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -405,6 +481,17 @@ class AggregationCodeGenerator( } } +val aggFuncCode = Seq( + genSetAggregationResults, + genAccumulate, + genRetract, + genCreateAccumulators, + genSetForwardedFields, + genSetConstantFlags, + genCreateOutputRow, + genMergeAccumulatorsPair, + genResetAccumulator).mkString("\n") --- End diff -- It make sense,I have look at ProcessFunctionWithCleanupState,the cleanUp should be called whenever cleanupState is called. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r133925097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -405,6 +481,17 @@ class AggregationCodeGenerator( } } +val aggFuncCode = Seq( + genSetAggregationResults, + genAccumulate, + genRetract, + genCreateAccumulators, + genSetForwardedFields, + genSetConstantFlags, + genCreateOutputRow, + genMergeAccumulatorsPair, + genResetAccumulator).mkString("\n") --- End diff -- It make senseï¼I have look at ProcessFunctionWithCleanupStateï¼the cleanUp should be called whenever cleanupState is called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132017#comment-16132017 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r133925176 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -179,13 +214,19 @@ class AggregationCodeGenerator( | ${aggMapping(i)}, | (${accTypes(i)}) accs.getField($i));""".stripMargin } else { +val setDataView = if (accConfig.isDefined && accConfig.get.isUseState) { --- End diff -- Yes, it make sense. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...
Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r133924725 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** +* CountDistinct accumulator. +*/ + public static class CountDistinctAccum { + public MapViewmap; + public long count; + } + + /** +* CountDistinct aggregate. +*/ + public static class CountDistinct extends AggregateFunction { --- End diff -- Yes, CountDistinct is just used for test case here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132010#comment-16132010 ] ASF GitHub Bot commented on FLINK-7206: --- Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r133924725 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** +* CountDistinct accumulator. +*/ + public static class CountDistinctAccum { + public MapViewmap; + public long count; + } + + /** +* CountDistinct aggregate. +*/ + public static class CountDistinct extends AggregateFunction { --- End diff -- Yes, CountDistinct is just used for test case here. > Implementation of DataView to support state access for UDAGG > > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code
[ https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131997#comment-16131997 ] ASF GitHub Bot commented on FLINK-7442: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4554 I think what you mentioned is one more reason to not use this in too many places for now, but only inside the TaskManager / Tasks. Let's introduce that as a tool that users can use to resolve conflicts and gather some feedback before we pull that into client / queryableStateClient / etc... > Add option for using a child-first classloader for loading user code > > > Key: FLINK-7442 > URL: https://issues.apache.org/jira/browse/FLINK-7442 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4554 I think what you mentioned is one more reason to not use this in too many places for now, but only inside the TaskManager / Tasks. Let's introduce that as a tool that users can use to resolve conflicts and gather some feedback before we pull that into client / queryableStateClient / etc... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131995#comment-16131995 ] ASF GitHub Bot commented on FLINK-6442: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r133921697 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated + def sql(sql: String): Table = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query -val parsed = planner.parse(query) +val parsed = planner.parse(sql) // validate the sql query val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) - new Table(this, LogicalRelNode(relational.rel)) } /** +* Evaluates a SQL Select query on registered tables and retrieves the result as a +* [[Table]]. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* val table: Table = ... +* // the table is not registered to the table environment +* tEnv.sqlSelect(s"SELECT * FROM $table") +* }}} +* +* @param sql The SQL string to evaluate. +* @return The result of the query as Table or null of the DML insert operation. +*/ + def sqlQuery(sql: String): Table = { +val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) +// parse the sql query +val parsed = planner.parse(sql) +if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { + // validate the sql query + val validated = planner.validate(parsed) + // transform to a relational tree + val relational = planner.rel(validated) + new Table(this, LogicalRelNode(relational.rel)) +} else { + throw new TableException( +"Unsupported sql query! sqlQuery Only accept SELECT, UNION, INTERSECT, EXCEPT, VALUES, " + --- End diff -- SqlParser.parseStmt() actually call the SqlParser.parseQuery, so they're the same. Could not help us to distinguish the sql type, so use SqlKind here, SqlKind.QUERY consists of: SELECT, EXCEPT, INTERSECT, UNION, VALUES, ORDER_BY, EXPLICIT_TABLE. > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > --- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} >
[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
[ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131996#comment-16131996 ] ASF GitHub Bot commented on FLINK-6442: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r133920847 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala --- @@ -19,9 +19,21 @@ package org.apache.flink.table.api import _root_.java.io.Serializable + import org.apache.flink.api.common.time.Time -class QueryConfig private[table] extends Serializable {} +class QueryConfig private[table] extends Serializable { --- End diff -- There're more than one place need this get default QueryConfig from table env. > Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in > SQL > --- > > Key: FLINK-6442 > URL: https://issues.apache.org/jira/browse/FLINK-6442 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > Currently in TableAPI there’s only registration method for source table, > when we use SQL writing a streaming job, we should add additional part for > the sink, like TableAPI does: > {code} > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3" > val t = StreamTestData.getSmall3TupleDataStream(env) > tEnv.registerDataStream("MyTable", t) > // one way: invoke tableAPI’s writeToSink method directly > val result = tEnv.sql(sqlQuery) > result.writeToSink(new YourStreamSink) > // another way: convert to datastream first and then invoke addSink > val result = tEnv.sql(sqlQuery).toDataStream[Row] > result.addSink(new StreamITCase.StringSink) > {code} > From the api we can see the sink table always be a derived table because its > 'schema' is inferred from the result type of upstream query. > Compare to traditional RDBMS which support DML syntax, a query with a target > output could be written like this: > {code} > insert into table target_table_name > [(column_name [ ,...n ])] > query > {code} > The equivalent form of the example above is as follows: > {code} > tEnv.registerTableSink("targetTable", new YourSink) > val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable" > val result = tEnv.sql(sql) > {code} > It is supported by Calcite’s grammar: > {code} > insert:( INSERT | UPSERT ) INTO tablePrimary > [ '(' column [, column ]* ')' ] > query > {code} > I'd like to extend Flink TableAPI to support such feature. see design doc: > https://goo.gl/n3phK5 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r133920847 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala --- @@ -19,9 +19,21 @@ package org.apache.flink.table.api import _root_.java.io.Serializable + import org.apache.flink.api.common.time.Time -class QueryConfig private[table] extends Serializable {} +class QueryConfig private[table] extends Serializable { --- End diff -- There're more than one place need this get default QueryConfig from table env. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...
Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3829#discussion_r133921697 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: TableConfig) { * tEnv.sql(s"SELECT * FROM $table") * }}} * -* @param query The SQL query to evaluate. +* @param sql The SQL string to evaluate. * @return The result of the query as Table. */ - def sql(query: String): Table = { + @deprecated + def sql(sql: String): Table = { val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) // parse the sql query -val parsed = planner.parse(query) +val parsed = planner.parse(sql) // validate the sql query val validated = planner.validate(parsed) // transform to a relational tree val relational = planner.rel(validated) - new Table(this, LogicalRelNode(relational.rel)) } /** +* Evaluates a SQL Select query on registered tables and retrieves the result as a +* [[Table]]. +* +* All tables referenced by the query must be registered in the TableEnvironment. But +* [[Table.toString]] will automatically register an unique table name and return the +* table name. So it allows to call SQL directly on tables like this: +* +* {{{ +* val table: Table = ... +* // the table is not registered to the table environment +* tEnv.sqlSelect(s"SELECT * FROM $table") +* }}} +* +* @param sql The SQL string to evaluate. +* @return The result of the query as Table or null of the DML insert operation. +*/ + def sqlQuery(sql: String): Table = { +val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory) +// parse the sql query +val parsed = planner.parse(sql) +if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) { + // validate the sql query + val validated = planner.validate(parsed) + // transform to a relational tree + val relational = planner.rel(validated) + new Table(this, LogicalRelNode(relational.rel)) +} else { + throw new TableException( +"Unsupported sql query! sqlQuery Only accept SELECT, UNION, INTERSECT, EXCEPT, VALUES, " + --- End diff -- SqlParser.parseStmt() actually call the SqlParser.parseQuery, so they're the same. Could not help us to distinguish the sql type, so use SqlKind here, SqlKind.QUERY consists of: SELECT, EXCEPT, INTERSECT, UNION, VALUES, ORDER_BY, EXPLICIT_TABLE. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat
[ https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131993#comment-16131993 ] ASF GitHub Bot commented on FLINK-7423: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4525 The logic is currently not correct with the contract of the input formats. A return value of null is not an "end of split" indicator. Also, the description mentions that this adds a test, which I cannot find in the diff... > Always reuse an instance to get elements from the inputFormat > --- > > Key: FLINK-7423 > URL: https://issues.apache.org/jira/browse/FLINK-7423 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Xu Pingyong >Assignee: Xu Pingyong > > In InputFormatSourceFunction.java: > {code:java} > OUT nextElement = serializer.createInstance(); > while (isRunning) { > format.open(splitIterator.next()); > // for each element we also check if cancel > // was called by checking the isRunning flag > while (isRunning && !format.reachedEnd()) { > nextElement = > format.nextRecord(nextElement); > if (nextElement != null) { > ctx.collect(nextElement); > } else { > break; > } > } > format.close(); > completedSplitsCounter.inc(); > if (isRunning) { > isRunning = splitIterator.hasNext(); > } > } > {code} > the format may return other element or null when nextRecord, that will may > cause exception. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4525 The logic is currently not correct with the contract of the input formats. A return value of null is not an "end of split" indicator. Also, the description mentions that this adds a test, which I cannot find in the diff... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-7477) Use "hadoop classpath" to augment classpath when available
Aljoscha Krettek created FLINK-7477: --- Summary: Use "hadoop classpath" to augment classpath when available Key: FLINK-7477 URL: https://issues.apache.org/jira/browse/FLINK-7477 Project: Flink Issue Type: Bug Components: Startup Shell Scripts Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Currently, some cloud environments don't properly put the Hadoop jars into {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should check in {{config.sh}} if the {{hadoop}} binary is on the path and augment our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in our scripts. This will improve the out-of-box experience of users that otherwise have to manually set {{HADOOP_CLASSPATH}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7475) ListState support update
[ https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7475: Component/s: DataStream API > ListState support update > > > Key: FLINK-7475 > URL: https://issues.apache.org/jira/browse/FLINK-7475 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Reporter: yf > > If I want to update the list. > I have to do two steps: > listState.clear() > for (Element e : myList) { > listState.add(e); > } > Why not I update the state by: > listState.update(myList) ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131967#comment-16131967 ] ASF GitHub Bot commented on FLINK-7347: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4468 Thanks a lot for contributing this fix, @ymost! I merged into master, could you please close this PR? > "removeAll" is extremely inefficient in > MessageAcknowledgingSourceBase.notifyCheckpointComplete > --- > > Key: FLINK-7347 > URL: https://issues.apache.org/jira/browse/FLINK-7347 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.1 >Reporter: Yonatan Most >Assignee: Yonatan Most > Fix For: 1.4.0 > > > Observe this line in > {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}: > {code} > idsProcessedButNotAcknowledged.removeAll(checkpoint.f1); > {code} > The implementation of {{removeAll}} is such that if the set is smaller than > the collection to remove, then the set is iterated and every item is checked > for containment in the collection. The type of {{checkpoint.f1}} here is > {{ArrayList}}, so the {{contains}} action is very inefficient, and it is > performed for every item in {{idsProcessedButNotAcknowledged}}. > In our pipeline we had about 10 million events processed, and the checkpoint > was stuck on the {{removeAll}} call for hours. > A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} > instead of an {{ArrayList}}. The fact that it's a list is not really used > anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4454: [hotfix][docs] Add section in docs about writing unit/int...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4454 @twalthr, kindly reminder :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4468: [FLINK-7347] [streaming] Keep ids for current checkpoint ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4468 Thanks a lot for contributing this fix, @ymost! ð I merged into master, could you please close this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7347. --- Resolution: Fixed Implemented in 76f1022884fe7b291fe81028a29896fb5b5ca5c9 on master. > "removeAll" is extremely inefficient in > MessageAcknowledgingSourceBase.notifyCheckpointComplete > --- > > Key: FLINK-7347 > URL: https://issues.apache.org/jira/browse/FLINK-7347 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.1 >Reporter: Yonatan Most >Assignee: Yonatan Most > Fix For: 1.4.0 > > > Observe this line in > {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}: > {code} > idsProcessedButNotAcknowledged.removeAll(checkpoint.f1); > {code} > The implementation of {{removeAll}} is such that if the set is smaller than > the collection to remove, then the set is iterated and every item is checked > for containment in the collection. The type of {{checkpoint.f1}} here is > {{ArrayList}}, so the {{contains}} action is very inefficient, and it is > performed for every item in {{idsProcessedButNotAcknowledged}}. > In our pipeline we had about 10 million events processed, and the checkpoint > was stuck on the {{removeAll}} call for hours. > A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} > instead of an {{ArrayList}}. The fact that it's a list is not really used > anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-7347: --- Assignee: Yonatan Most > "removeAll" is extremely inefficient in > MessageAcknowledgingSourceBase.notifyCheckpointComplete > --- > > Key: FLINK-7347 > URL: https://issues.apache.org/jira/browse/FLINK-7347 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.1 >Reporter: Yonatan Most >Assignee: Yonatan Most > Fix For: 1.4.0 > > > Observe this line in > {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}: > {code} > idsProcessedButNotAcknowledged.removeAll(checkpoint.f1); > {code} > The implementation of {{removeAll}} is such that if the set is smaller than > the collection to remove, then the set is iterated and every item is checked > for containment in the collection. The type of {{checkpoint.f1}} here is > {{ArrayList}}, so the {{contains}} action is very inefficient, and it is > performed for every item in {{idsProcessedButNotAcknowledged}}. > In our pipeline we had about 10 million events processed, and the checkpoint > was stuck on the {{removeAll}} call for hours. > A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} > instead of an {{ArrayList}}. The fact that it's a list is not really used > anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete
[ https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7347: Fix Version/s: 1.4.0 > "removeAll" is extremely inefficient in > MessageAcknowledgingSourceBase.notifyCheckpointComplete > --- > > Key: FLINK-7347 > URL: https://issues.apache.org/jira/browse/FLINK-7347 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.1 >Reporter: Yonatan Most > Fix For: 1.4.0 > > > Observe this line in > {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}: > {code} > idsProcessedButNotAcknowledged.removeAll(checkpoint.f1); > {code} > The implementation of {{removeAll}} is such that if the set is smaller than > the collection to remove, then the set is iterated and every item is checked > for containment in the collection. The type of {{checkpoint.f1}} here is > {{ArrayList}}, so the {{contains}} action is very inefficient, and it is > performed for every item in {{idsProcessedButNotAcknowledged}}. > In our pipeline we had about 10 million events processed, and the checkpoint > was stuck on the {{removeAll}} call for hours. > A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} > instead of an {{ArrayList}}. The fact that it's a list is not really used > anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131948#comment-16131948 ] Jark Wu commented on FLINK-7465: I think the problem is how to design the state. Even if it is a bitarray or bitmap, it is expensive to de/serialize the state when every time call the {{accumulate}}. > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()
[ https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131944#comment-16131944 ] ASF GitHub Bot commented on FLINK-7402: --- GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4562 [FLINK-7402] Fix ineffective null check in NettyMessage#write() ## What is the purpose of the change Fix ineffective null check in NettyMessage#write() ## Brief change log *(for example:)* - *Add null check and remove unnecessary null check* ## Verifying this change *(Please pick either of the following options)* No test case ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-7402 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4562.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4562 commit e289b37cf8adce56876b47ee33ed161058bf64e6 Author: zjureelDate: 2017-08-18T06:43:34Z [FLINK-7402] Fix ineffective null check in NettyMessage#write() > Ineffective null check in NettyMessage#write() > -- > > Key: FLINK-7402 > URL: https://issues.apache.org/jira/browse/FLINK-7402 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > Here is the null check in finally block: > {code} > finally { > if (buffer != null) { > buffer.recycle(); > } > {code} > But buffer has been dereferenced in the try block without guard. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...
GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/4562 [FLINK-7402] Fix ineffective null check in NettyMessage#write() ## What is the purpose of the change Fix ineffective null check in NettyMessage#write() ## Brief change log *(for example:)* - *Add null check and remove unnecessary null check* ## Verifying this change *(Please pick either of the following options)* No test case ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-7402 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4562.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4562 commit e289b37cf8adce56876b47ee33ed161058bf64e6 Author: zjureelDate: 2017-08-18T06:43:34Z [FLINK-7402] Fix ineffective null check in NettyMessage#write() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code
[ https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131926#comment-16131926 ] ASF GitHub Bot commented on FLINK-7442: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4554 Probably, yes. I also don't know what other stuff this could break, though. For example, I don't know if queryable state, which starts netty (and Akka) stuff will still work with this. Unfortunately we don't have any end-to-end tests for that. > Add option for using a child-first classloader for loading user code > > > Key: FLINK-7442 > URL: https://issues.apache.org/jira/browse/FLINK-7442 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4554 Probably, yes. I also don't know what other stuff this could break, though. For example, I don't know if queryable state, which starts netty (and Akka) stuff will still work with this. Unfortunately we don't have any end-to-end tests for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector
[ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131913#comment-16131913 ] ASF GitHub Bot commented on FLINK-6988: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 Implemented fixed size pool of producers, please check last commit. If we run out of producers in the pool, exception is being thrown aborting ongoing snapshot. > Add Apache Kafka 0.11 connector > --- > > Key: FLINK-6988 > URL: https://issues.apache.org/jira/browse/FLINK-6988 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.3.1 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Kafka 0.11 (it will be released very soon) add supports for transactions. > Thanks to that, Flink might be able to implement Kafka sink supporting > "exactly-once" semantic. API changes and whole transactions support is > described in > [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging]. > The goal is to mimic implementation of existing BucketingSink. New > FlinkKafkaProducer011 would > * upon creation begin transaction, store transaction identifiers into the > state and would write all incoming data to an output Kafka topic using that > transaction > * on `snapshotState` call, it would flush the data and write in state > information that current transaction is pending to be committed > * on `notifyCheckpointComplete` we would commit this pending transaction > * in case of crash between `snapshotState` and `notifyCheckpointComplete` we > either abort this pending transaction (if not every participant successfully > saved the snapshot) or restore and commit it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4239 Implemented fixed size pool of producers, please check last commit. If we run out of producers in the pool, exception is being thrown aborting ongoing snapshot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
[ https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131907#comment-16131907 ] ASF GitHub Bot commented on FLINK-7476: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4561 [FLINK-7476][streaming] Continue using previous transaction on failures First commit comes from #4557. Previously when using TwoPhaseCommitSinkFunction, if there was some intermittent failure in "beginTransaction", not only the snapshot that triggered this call failed, but also any subsequent write requests would fail also. This caused such sink unusable until application restart. This PR changes order of execution of the methods from `PublicEvolving` class that has not been yet released. PR is covered by existing tests in `TwoPhaseCommitSinkFunctionTest` as well as two additional test cases (`testContinueWorkOnBeginTransactionFailure` would be failing before this PR). You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink 2phase-recover Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4561.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4561 commit 813c8121b406b60ea478f4765b33b7d75c221d1e Author: Piotr NowojskiDate: 2017-08-14T14:40:45Z [hotifx][streaming] Simplify state of TwoPhaseCommitSinkFunction commit 249b6419af3d15414dc411838aba624d0ee2f3a1 Author: Piotr Nowojski Date: 2017-08-14T13:09:39Z [hotfix][tests] Implement AutoCloseable in TestHarness commit a0ae6324dcaded581a3352c8ff4bae6e86e01fde Author: Piotr Nowojski Date: 2017-08-17T13:46:47Z [hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest commit e3c7dc83ccbce2505be5769fa0827b09dfa54875 Author: Piotr Nowojski Date: 2017-08-17T10:29:16Z [FLINK-7476][streaming] Continue using previous transaction on failures Previuosly when using TwoPhaseCommitSinkFunction, if there was some intermittent failure in "beginTransaction", not only the snapshot that triggered this call failed, but also any subsequent write requests would fail also. This caused such sink unusable until application restart. > Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction > -- > > Key: FLINK-7476 > URL: https://issues.apache.org/jira/browse/FLINK-7476 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Currently when using TwoPhaseCommitSinkFunction, if there is some > intermittent failure in "beginTransaction", not only the snapshot that > triggered this call fail, but also any subsequent write requests will fail > also, rendering such sink unusable until application restart. > This issue is in code that hasn't been released yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4561 [FLINK-7476][streaming] Continue using previous transaction on failures First commit comes from #4557. Previously when using TwoPhaseCommitSinkFunction, if there was some intermittent failure in "beginTransaction", not only the snapshot that triggered this call failed, but also any subsequent write requests would fail also. This caused such sink unusable until application restart. This PR changes order of execution of the methods from `PublicEvolving` class that has not been yet released. PR is covered by existing tests in `TwoPhaseCommitSinkFunctionTest` as well as two additional test cases (`testContinueWorkOnBeginTransactionFailure` would be failing before this PR). You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink 2phase-recover Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4561.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4561 commit 813c8121b406b60ea478f4765b33b7d75c221d1e Author: Piotr NowojskiDate: 2017-08-14T14:40:45Z [hotifx][streaming] Simplify state of TwoPhaseCommitSinkFunction commit 249b6419af3d15414dc411838aba624d0ee2f3a1 Author: Piotr Nowojski Date: 2017-08-14T13:09:39Z [hotfix][tests] Implement AutoCloseable in TestHarness commit a0ae6324dcaded581a3352c8ff4bae6e86e01fde Author: Piotr Nowojski Date: 2017-08-17T13:46:47Z [hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest commit e3c7dc83ccbce2505be5769fa0827b09dfa54875 Author: Piotr Nowojski Date: 2017-08-17T10:29:16Z [FLINK-7476][streaming] Continue using previous transaction on failures Previuosly when using TwoPhaseCommitSinkFunction, if there was some intermittent failure in "beginTransaction", not only the snapshot that triggered this call failed, but also any subsequent write requests would fail also. This caused such sink unusable until application restart. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
[ https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-7476: Affects Version/s: 1.4.0 > Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction > -- > > Key: FLINK-7476 > URL: https://issues.apache.org/jira/browse/FLINK-7476 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Currently when using TwoPhaseCommitSinkFunction, if there is some > intermittent failure in "beginTransaction", not only the snapshot that > triggered this call fail, but also any subsequent write requests will fail > also, rendering such sink unusable until application restart. > This issue is in code that hasn't been released yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
[ https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-7476: Fix Version/s: 1.4.0 > Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction > -- > > Key: FLINK-7476 > URL: https://issues.apache.org/jira/browse/FLINK-7476 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Fix For: 1.4.0 > > > Currently when using TwoPhaseCommitSinkFunction, if there is some > intermittent failure in "beginTransaction", not only the snapshot that > triggered this call fail, but also any subsequent write requests will fail > also, rendering such sink unusable until application restart. > This issue is in code that hasn't been released yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code
[ https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131889#comment-16131889 ] ASF GitHub Bot commented on FLINK-7442: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4554 Ah, I see - that is probably related to reading resources from a jar file (like the config), methods like `getResourceAsStream()`. I can see that the "delegate to parent after child" lookup logic might not quite work for that, yes. I think that particular bug you encountered there is almost an argument to not change the logic on the client, yet. Since we would run the entire Flink code through that classloader as well, the implications are trickier than in the runtime task, where we only instantiate the user code functions. > Add option for using a child-first classloader for loading user code > > > Key: FLINK-7442 > URL: https://issues.apache.org/jira/browse/FLINK-7442 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > -- This message was sent by Atlassian JIRA (v6.4.14#64029)