[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591218#comment-16591218 ] vinoyang commented on FLINK-10205: -- [~isunjin] Can you assign this issue to yourself? If not, you may need to apply for the Contributor permission first. > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6609: [hotfix][docs] missing breaking metrics page
asfgit closed pull request #6609: [hotfix][docs] missing breaking metrics page URL: https://github.com/apache/flink/pull/6609 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index a93853c14dc..7d88a36393c 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1486,6 +1486,7 @@ Thus, in order to infer the metric identifier: bytesRequestedPerFetch stream, shardId The bytes requested (2 Mbps / loopFrequencyHz) in a single call to getRecords. + Gauge This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591208#comment-16591208 ] ASF GitHub Bot commented on FLINK-7243: --- docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212531176 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to Read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat extends FileInputFormat implements Review comment: Should we support filter pushdown? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-7243: -- Labels: pull-request-available (was: ) > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-7243: -- Labels: pull-request-available (was: ) > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591206#comment-16591206 ] ASF GitHub Bot commented on FLINK-7243: --- docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212530605 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; + +/** + * Schema converter converts Parquet schema to and from Flink internal types. + */ +public class ParquetSchemaConverter { + public static final String MAP_KEY = "key"; + public static final String MAP_VALUE = "value"; + public static final String LIST_ELEMENT = "array"; + public static final String MESSAGE_ROOT = "root"; + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + public static TypeInformation fromParquetType(MessageType type) { + return convertFields(type.getFields()); + } + + public static MessageType toParquetType(TypeInformation typeInformation) { + return (MessageType) convertField(null, typeInformation, Type.Repetition.OPTIONAL); + } + + private static TypeInformation convertFields(List parquetFields) { + List> types = new ArrayList<>(); + List names = new ArrayList<>(); + for (Type field : parquetFields) { + TypeInformation subType = convertField(field); + if (subType != null) { + types.add(subType); + names.add(field.getName()); + } + } + + return new RowTypeInfo(types.toArray(new TypeInformation[types.size()]), + names.toArray(new String[names.size()])); + } + + private static TypeInformation convertField(final Type fieldType) { + TypeInformation typeInfo = null; + if (fieldType.isPrimitive()) { + PrimitiveType primitiveType = fieldType.asPrimitiveType(); Review comment: How about support more types, SqlTimeTypeInfo, Decimal, etc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-7243: -- Labels: pull-request-available (was: ) > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591207#comment-16591207 ] ASF GitHub Bot commented on FLINK-7243: --- docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212530857 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A subclass of {@link ParquetInputFormat} to read from Parquet files and convert to {@link Row}. + * It is mainly used to integrate with table API and batch SQL. + */ +public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { Review comment: Offer a tablesource for SQL/Table API ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > Labels: pull-request-available > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212530605 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; + +/** + * Schema converter converts Parquet schema to and from Flink internal types. + */ +public class ParquetSchemaConverter { + public static final String MAP_KEY = "key"; + public static final String MAP_VALUE = "value"; + public static final String LIST_ELEMENT = "array"; + public static final String MESSAGE_ROOT = "root"; + private static final AvroSchemaConverter SCHEMA_CONVERTER = new AvroSchemaConverter(); + + public static TypeInformation fromParquetType(MessageType type) { + return convertFields(type.getFields()); + } + + public static MessageType toParquetType(TypeInformation typeInformation) { + return (MessageType) convertField(null, typeInformation, Type.Repetition.OPTIONAL); + } + + private static TypeInformation convertFields(List parquetFields) { + List> types = new ArrayList<>(); + List names = new ArrayList<>(); + for (Type field : parquetFields) { + TypeInformation subType = convertField(field); + if (subType != null) { + types.add(subType); + names.add(field.getName()); + } + } + + return new RowTypeInfo(types.toArray(new TypeInformation[types.size()]), + names.toArray(new String[names.size()])); + } + + private static TypeInformation convertField(final Type fieldType) { + TypeInformation typeInfo = null; + if (fieldType.isPrimitive()) { + PrimitiveType primitiveType = fieldType.asPrimitiveType(); Review comment: How about support more types, SqlTimeTypeInfo, Decimal, etc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212531176 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java ## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + * Using {@link ParquetRecordReader} to Read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat extends FileInputFormat implements Review comment: Should we support filter pushdown? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#discussion_r212530857 ## File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A subclass of {@link ParquetInputFormat} to read from Parquet files and convert to {@link Row}. + * It is mainly used to integrate with table API and batch SQL. + */ +public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { Review comment: Offer a tablesource for SQL/Table API ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591188#comment-16591188 ] buptljy commented on FLINK-10202: - [~StephanEwen] Here is my code. I think the checkpoint directory in CheckpointCoordinator is still empty even if you set direcotry in FsStateBackend. {code:java} val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(15000) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.setStateBackend(new FsStateBackend(checkpointDir)){code} 1. If I don't set state.checkpoint.dir, it will throw exception above. 2. If i remove the third line, it will be okay. > Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment > --- > > Key: FLINK-10202 > URL: https://issues.apache.org/jira/browse/FLINK-10202 > Project: Flink > Issue Type: Improvement >Reporter: buptljy >Priority: Major > > Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we > run a flink job locally, and we're not able to set state.checkpoint.dir for > background wrapped cluster, which will cause > {code:java} > throw new IllegalStateException("CheckpointConfig says to persist periodic " + > "checkpoints, but no checkpoint directory has been configured. You can > " + > "configure configure one via key '" + > ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); > {code} > I wonder if we could provide a public method in *StreamExecutionEnvironment* > so that developers can use it to set state.checkpoint.dir for job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591066#comment-16591066 ] JIN SUN commented on FLINK-10205: - I would like to work on this issue > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Reporter: JIN SUN >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
JIN SUN created FLINK-10205: --- Summary: Batch Job: InputSplit Fault tolerant for DataSourceTask Key: FLINK-10205 URL: https://issues.apache.org/jira/browse/FLINK-10205 Project: Flink Issue Type: Sub-task Components: JobManager Reporter: JIN SUN Today DataSource Task pull InputSplits from JobManager to achieve better performance, however, when a DataSourceTask failed and rerun, it will not get the same splits as its previous version. this will introduce inconsistent result or even data corruption. Furthermore, if there are two executions run at the same time (in batch scenario), this two executions should process same splits. we need to fix the issue to make the inputs of a DataSourceTask deterministic. The propose is save all splits into ExecutionVertex and DataSourceTask will pull split from there. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10204: --- Labels: pull-request-available (was: ) > Job is marked as FAILED after serialization exception > - > > Key: FLINK-10204 > URL: https://issues.apache.org/jira/browse/FLINK-10204 > Project: Flink > Issue Type: Bug >Reporter: Ben La Monica >Priority: Major > Labels: pull-request-available > > We have a long running flink job that eventually fails and is shut down due > to an internal serialization exception that we keep on getting. Here is the > stack trace: > {code:java} > 2018-08-23 18:39:48,199 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation > (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. > java.io.IOException: Corrupt stream, found tag: 127 > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > I think I have tracked down the issue to a mismatch in the > serialization/deserialization/copy code in the StreamElementSerializer with > regards to the LATENCY_MARKER. > The Serialization logic writes 3 LONGs and an INT. The copy logic only writes > (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an > EOFException, and fixing the copy code causes the test to pass again. > I've written a unit test that highlights the problem, and have written the > code to correct it. > I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] benlamonica opened a new pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.
benlamonica opened a new pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Fixes a serialization error with regards to the LatencyMarker not having the same binary format when serializing, and when copying the byte stream. The following exception occurs after a long running flink job has been running for a while: `2018-08-23 18:39:48,199 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job NAV Estimation (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. java.io.IOException: Corrupt stream, found tag: 127 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748)` ## Brief change log - *Added LatencyMarker to the StreamElementSerializerTest and then fixed the failure by correcting the StreamElementSerializer::copy method. ## Verifying this change This change added tests and can be verified as follows: - * Commenting out the fix in the class will show that when serializing and deserializing, you will encounter an EOFException due to the incorrect copy* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes (modified the equals/hashCode on the LatencyMarker class so that the test would pass) - The serializers: yes - The runtime per-record code paths (performance sensitive): don't know - Anything that affects deployment or recovery: JobManager (and its com
[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception
[ https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590863#comment-16590863 ] ASF GitHub Bot commented on FLINK-10204: benlamonica opened a new pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records. URL: https://github.com/apache/flink/pull/6610 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Fixes a serialization error with regards to the LatencyMarker not having the same binary format when serializing, and when copying the byte stream. The following exception occurs after a long running flink job has been running for a while: `2018-08-23 18:39:48,199 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job NAV Estimation (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. java.io.IOException: Corrupt stream, found tag: 127 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748)` ## Brief change log - *Added LatencyMarker to the StreamElementSerializerTest and then fixed the failure by correcting the StreamElementSerializer::copy method. ## Verifying this change This change added tests and can be verified as follows: - * Commenting out the fix in the class will show that when serializing and deserializing, you will encounter an EOFException due to the incorrect copy* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes (modified
[jira] [Created] (FLINK-10204) Job is marked as FAILED after serialization exception
Ben La Monica created FLINK-10204: - Summary: Job is marked as FAILED after serialization exception Key: FLINK-10204 URL: https://issues.apache.org/jira/browse/FLINK-10204 Project: Flink Issue Type: Bug Reporter: Ben La Monica We have a long running flink job that eventually fails and is shut down due to an internal serialization exception that we keep on getting. Here is the stack trace: {code:java} 2018-08-23 18:39:48,199 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED. java.io.IOException: Corrupt stream, found tag: 127 at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748){code} I think I have tracked down the issue to a mismatch in the serialization/deserialization/copy code in the StreamElementSerializer with regards to the LATENCY_MARKER. The Serialization logic writes 3 LONGs and an INT. The copy logic only writes (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an EOFException, and fixing the copy code causes the test to pass again. I've written a unit test that highlights the problem, and have written the code to correct it. I'll submit a PR that goes along with it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590701#comment-16590701 ] ASF GitHub Bot commented on FLINK-8532: --- Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-415499383 Hi @StephanEwen , your suggestion lead me to deep thinks, and the extreme performance is exactly what we want. I am going to ask you for more suggestions. I prefer to initialize the partitioner instance with a random partition, however in the design ahead, the partitioner doesn't know the target range. The alternative is like this: ``` private final int[] returnArray = new int[] {ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1)}; @Override public int[] selectChannels(SerializationDelegate> record, int numberOfOutputChannels) { this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels; return this.returnArray; } ``` Please tell me how you think, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition
Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-415499383 Hi @StephanEwen , your suggestion lead me to deep thinks, and the extreme performance is exactly what we want. I am going to ask you for more suggestions. I prefer to initialize the partitioner instance with a random partition, however in the design ahead, the partitioner doesn't know the target range. The alternative is like this: ``` private final int[] returnArray = new int[] {ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1)}; @Override public int[] selectChannels(SerializationDelegate> record, int numberOfOutputChannels) { this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels; return this.returnArray; } ``` Please tell me how you think, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10186) FindBugs warnings: Random object created and used only once
[ https://issues.apache.org/jira/browse/FLINK-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590696#comment-16590696 ] ASF GitHub Bot commented on FLINK-10186: Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix FindBugs warnings: Random object created and used o… URL: https://github.com/apache/flink/pull/6591#discussion_r212410516 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java ## @@ -115,7 +115,7 @@ public BufferSpiller(IOManager ioManager, int pageSize) throws IOException { this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length]; byte[] rndBytes = new byte[32]; - new Random().nextBytes(rndBytes); Review comment: It's inside constructor method, and just randoms for once in an instance. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > FindBugs warnings: Random object created and used only once > --- > > Key: FLINK-10186 > URL: https://issues.apache.org/jira/browse/FLINK-10186 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Hiroaki Yoshida >Priority: Major > Labels: pull-request-available > > FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported a > DMI_RANDOM_USED_ONLY_ONCE warning on master: > {code:java} > H B DMI: Random object created and used only once in new > org.apache.flink.streaming.runtime.io.BufferSpiller(IOManager, int) At > BufferSpiller.java:[line 118] > {code} > The description of the bug is as follows: > {quote}*DMI: Random object created and used only once > (DMI_RANDOM_USED_ONLY_ONCE)* > This code creates a java.util.Random object, uses it to generate one random > number, and then discards the Random object. This produces mediocre quality > random numbers and is inefficient. If possible, rewrite the code so that the > Random object is created once and saved, and each time a new random number is > required invoke a method on the existing Random object to obtain it. > If it is important that the generated Random numbers not be guessable, you > must not create a new Random for each random number; the values are too > easily guessable. You should strongly consider using a > java.security.SecureRandom instead (and avoid allocating a new SecureRandom > for each random number needed). > [http://findbugs.sourceforge.net/bugDescriptions.html#DMI_RANDOM_USED_ONLY_ONCE] > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix FindBugs warnings: Random object created and used o…
Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix FindBugs warnings: Random object created and used o… URL: https://github.com/apache/flink/pull/6591#discussion_r212410516 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java ## @@ -115,7 +115,7 @@ public BufferSpiller(IOManager ioManager, int pageSize) throws IOException { this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length]; byte[] rndBytes = new byte[32]; - new Random().nextBytes(rndBytes); Review comment: It's inside constructor method, and just randoms for once in an instance. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590683#comment-16590683 ] Kostas Kloudas commented on FLINK-10203: No problem [~artsem.semianenka]! I assigned the issue to you. > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Assignee: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-10203: -- Assignee: Artsem Semianenka > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Assignee: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10186) FindBugs warnings: Random object created and used only once
[ https://issues.apache.org/jira/browse/FLINK-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590646#comment-16590646 ] ASF GitHub Bot commented on FLINK-10186: Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix FindBugs warnings: Random object created and used o… URL: https://github.com/apache/flink/pull/6591#discussion_r212410516 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java ## @@ -115,7 +115,7 @@ public BufferSpiller(IOManager ioManager, int pageSize) throws IOException { this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length]; byte[] rndBytes = new byte[32]; - new Random().nextBytes(rndBytes); Review comment: It's inside constructor method, and just randoms for once in an instance. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > FindBugs warnings: Random object created and used only once > --- > > Key: FLINK-10186 > URL: https://issues.apache.org/jira/browse/FLINK-10186 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Hiroaki Yoshida >Priority: Major > Labels: pull-request-available > > FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported a > DMI_RANDOM_USED_ONLY_ONCE warning on master: > {code:java} > H B DMI: Random object created and used only once in new > org.apache.flink.streaming.runtime.io.BufferSpiller(IOManager, int) At > BufferSpiller.java:[line 118] > {code} > The description of the bug is as follows: > {quote}*DMI: Random object created and used only once > (DMI_RANDOM_USED_ONLY_ONCE)* > This code creates a java.util.Random object, uses it to generate one random > number, and then discards the Random object. This produces mediocre quality > random numbers and is inefficient. If possible, rewrite the code so that the > Random object is created once and saved, and each time a new random number is > required invoke a method on the existing Random object to obtain it. > If it is important that the generated Random numbers not be guessable, you > must not create a new Random for each random number; the values are too > easily guessable. You should strongly consider using a > java.security.SecureRandom instead (and avoid allocating a new SecureRandom > for each random number needed). > [http://findbugs.sourceforge.net/bugDescriptions.html#DMI_RANDOM_USED_ONLY_ONCE] > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix FindBugs warnings: Random object created and used o…
Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix FindBugs warnings: Random object created and used o… URL: https://github.com/apache/flink/pull/6591#discussion_r212410516 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java ## @@ -115,7 +115,7 @@ public BufferSpiller(IOManager ioManager, int pageSize) throws IOException { this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % tempDirs.length]; byte[] rndBytes = new byte[32]; - new Random().nextBytes(rndBytes); Review comment: It's inside constructor method, and just randoms for once in an instance. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590627#comment-16590627 ] Artsem Semianenka edited comment on FLINK-10203 at 8/23/18 6:24 PM: I'm so sorry guys, this is my first time when I try to submit an issue into Apache project. But I guess I have no enough rights to assign this ticket to myself. was (Author: artsem.semianenka): I'm so sorry guys, this is my first time when I try to submit an issue into Apache project. > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590627#comment-16590627 ] Artsem Semianenka commented on FLINK-10203: --- I'm so sorry guys, this is my first time when I try to submit an issue into Apache project. > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590614#comment-16590614 ] ASF GitHub Bot commented on FLINK-8532: --- Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-415513913 I updated the code like mentioned above just now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-415513913 I updated the code like mentioned above just now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9626) Possible resource leak in FileSystem
[ https://issues.apache.org/jira/browse/FLINK-9626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590612#comment-16590612 ] Stephan Ewen commented on FLINK-9626: - Note, that this happens only if there is no other FS factory in the classpath, which is not commonly the case. So in practice, this should not really happen outside test setups. > Possible resource leak in FileSystem > > > Key: FLINK-9626 > URL: https://issues.apache.org/jira/browse/FLINK-9626 > Project: Flink > Issue Type: Bug > Components: FileSystem >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Critical > > There is a potential resource leak in > org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem. > Inside it there is a code: > > {code:java} > // this "default" initialization makes sure that the FileSystem class works > // even when not configured with an explicit Flink configuration, like on > // JobManager or TaskManager setup > if (FS_FACTORIES.isEmpty()) { >initialize(new Configuration()); > } > {code} > which is executed on each cache miss. However this initialize method is also > doing > > > {code:java} > CACHE.clear(); > {code} > without closing file systems in CACHE (this could be problematic for > HadoopFileSystem which is a wrapper around org.apache.hadoop.fs.FileSystem > which is closable). > Now if for example we are constantly accessing two different file systems > (file systems are differentiated by combination of [schema and > authority|https://en.wikipedia.org/wiki/Uniform_Resource_Identifier#Generic_syntax] > part from the file system's URI) initialized from FALLBACK_FACTORY, each > time we call getUnguardedFileSystem for one of them, that call will clear > from CACHE entry for the other one. Thus we will constantly be creating new > FileSystems without closing them. > Solution could be to either not clear the CACHE or make sure that FileSystems > are properly closed. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590607#comment-16590607 ] Kostas Kloudas commented on FLINK-10203: Hi [~artsem.semianenka]! Given that you are working on the issue, you should also assign it to yourself. In addition, I link the discussion from the mailing list, so that we keep track of everything https://mail-archives.apache.org/mod_mbox/flink-dev/201808.mbox/%3cf7af3ef2-1407-443e-a092-7b13a2467...@data-artisans.com%3E Discussions are also better to happen in JIRA, before submitting PRs to Github. This is not only a personal opinion but I believe it is also a requirement from Apache as this is also Apache space, while Github is not. > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-10203: -- Assignee: (was: Kostas Kloudas) > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-10203: -- Assignee: Kostas Kloudas > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
[ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590591#comment-16590591 ] Stephan Ewen commented on FLINK-10195: -- Is that something that can be configured on RabbitMQ, for example a capacity bound on the queue in the QueuingConsumer? > RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly > - > > Key: FLINK-10195 > URL: https://issues.apache.org/jira/browse/FLINK-10195 > Project: Flink > Issue Type: Bug > Components: RabbitMQ Connector >Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0 >Reporter: Luka Jurukovski >Priority: Major > > The connection between the RabbitMQ server and the client does not > appropriately back pressure when auto acking is disabled. This becomes very > problematic when a downstream process throttles the data processing to slower > then RabbitMQ sends the data to the client. > The difference in records ends up being stored in the flink's heap space, > which grows indefinitely (or technically to "Integer Max" Deliveries). > Looking at RabbitMQ's metrics the number of unacked messages looks like > steadily rising saw tooth shape. > Upon further invesitgation it looks like this is due to how the > QueueingConsumer works, messages are added to the BlockingQueue faster then > they are being removed and processed, resulting in the previously described > behavior. > This may be intended behavior, however this isn't explicitly obvious in the > documentation or any of the examples I have seen. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590588#comment-16590588 ] ASF GitHub Bot commented on FLINK-10203: StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-415507439 I see, that is a fair reason. There are parallel efforts to add Parquet support to the old BucketingSink, but I see the point. Before going into a deep review, can you update the description with how exactly the legacy truncater should be working: what copy and rename steps it does and how it behaves under failure / repeated calls. Also, I would suggest to name it `Truncater` rather than `TruncateManager`. Too many managers all around already ;-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-415507439 I see, that is a fair reason. There are parallel efforts to add Parquet support to the old BucketingSink, but I see the point. Before going into a deep review, can you update the description with how exactly the legacy truncater should be working: what copy and rename steps it does and how it behaves under failure / repeated calls. Also, I would suggest to name it `Truncater` rather than `TruncateManager`. Too many managers all around already ;-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590577#comment-16590577 ] Stephan Ewen commented on FLINK-10202: -- You can do that by calling {{env.setStateBackend(new FsStateBackend(checkpointDir))}}. > Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment > --- > > Key: FLINK-10202 > URL: https://issues.apache.org/jira/browse/FLINK-10202 > Project: Flink > Issue Type: Improvement >Reporter: buptljy >Priority: Major > > Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we > run a flink job locally, and we're not able to set state.checkpoint.dir for > background wrapped cluster, which will cause > {code:java} > throw new IllegalStateException("CheckpointConfig says to persist periodic " + > "checkpoints, but no checkpoint directory has been configured. You can > " + > "configure configure one via key '" + > ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); > {code} > I wonder if we could provide a public method in *StreamExecutionEnvironment* > so that developers can use it to set state.checkpoint.dir for job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] alpinegizmo opened a new pull request #6609: [hotfix][docs] missing breaking metrics page
alpinegizmo opened a new pull request #6609: [hotfix][docs] missing breaking metrics page URL: https://github.com/apache/flink/pull/6609 Adding a missing `` to metrics.md. In both [master](https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#kinesis-connectors) and [release-1.6](https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#kinesis-connectors) the Kinesis Connectors section of the metrics doc page is broken, and appears like this: Kinesis Connectors ## Latency tracking Flink allows to track the latency of rec... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition
[ https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590561#comment-16590561 ] ASF GitHub Bot commented on FLINK-8532: --- Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-415499383 Hi @StephanEwen , your suggestion lead me to deep thinks, and the extreme performance is exactly what we want. I am going to ask you for more suggestions. I prefer to initialize the partitioner instance with a random partition, however in the design ahead, the partitioner doesn't know the target range. The alternative is like this: ``` private final int[] returnArray = new int[] {new Random().nextInt(Integer.MAX_VALUE - 1)}; @Override public int[] selectChannels(SerializationDelegate> record, int numberOfOutputChannels) { this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels; return this.returnArray; } ``` Please tell me how you think, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > RebalancePartitioner should use Random value for its first partition > > > Key: FLINK-8532 > URL: https://issues.apache.org/jira/browse/FLINK-8532 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Yuta Morisawa >Assignee: Guibo Pan >Priority: Major > Labels: pull-request-available > > In some conditions, RebalancePartitioner doesn't balance data correctly > because it use the same value for selecting next operators. > RebalancePartitioner initializes its partition id using the same value in > every threads, so it indeed balances data, but at one moment the amount of > data in each operator is skew. > Particularly, when the data rate of former operators is equal , data skew > becomes severe. > > > Example: > Consider a simple operator chain. > -> map1 -> rebalance -> map2 -> > Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, > 6). > map1 map2 > st1 st4 > st2 st5 > st3 st6 > > At the beginning, every subtasks in map1 sends data to st4 in map2 because > they use the same initial parition id. > Next time the map1 receive data st1,2,3 send data to st5 because they > increment its partition id when they processed former data. > In my environment, it takes twice the time to process data when I use > RebalancePartitioner as long as I use other partitioners(rescale, keyby). > > To solve this problem, in my opinion, RebalancePartitioner should use its own > operator id for the initial value. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition URL: https://github.com/apache/flink/pull/6544#issuecomment-415499383 Hi @StephanEwen , your suggestion lead me to deep thinks, and the extreme performance is exactly what we want. I am going to ask you for more suggestions. I prefer to initialize the partitioner instance with a random partition, however in the design ahead, the partitioner doesn't know the target range. The alternative is like this: ``` private final int[] returnArray = new int[] {new Random().nextInt(Integer.MAX_VALUE - 1)}; @Override public int[] selectChannels(SerializationDelegate> record, int numberOfOutputChannels) { this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels; return this.returnArray; } ``` Please tell me how you think, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests
[ https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590544#comment-16590544 ] ASF GitHub Bot commented on FLINK-10201: fhueske commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests URL: https://github.com/apache/flink/pull/6605#issuecomment-415496307 Thanks for the PR @xccui. +1 to merge This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The batchTestUtil was mistakenly used in some stream sql tests > -- > > Key: FLINK-10201 > URL: https://issues.apache.org/jira/browse/FLINK-10201 > Project: Flink > Issue Type: Test > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Labels: pull-request-available > > The {{batchTestUtil}} was mistakenly used in stream sql tests > {{SetOperatorsTest.testValuesWithCast()}} and > {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests
fhueske commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests URL: https://github.com/apache/flink/pull/6605#issuecomment-415496307 Thanks for the PR @xccui. +1 to merge This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590520#comment-16590520 ] ASF GitHub Bot commented on FLINK-10011: azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#discussion_r212321052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ## @@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception // Everything worked, let's remove a previous checkpoint if necessary. while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { try { - removeSubsumed(completedCheckpoints.removeFirst()); + final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); Review comment: I would try to move the whole try/catch into one method to deduplicate code with `shutdown()`, e.g.: ``` void tryRemove(Runnable doRemove) { try { // .. doRemove.run(); // completedCheckpoint.discardOnSubsume(); or OnShutdown // .. } catch { // ... } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Elias Levy >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, ne
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590519#comment-16590519 ] ASF GitHub Bot commented on FLINK-10011: azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#discussion_r212379451 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.ActorUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.dispatcher.DispatcherHATest; +import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.testingUtils.TestingJobManager; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperResource; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.actor.Identify; +import akka.actor.Terminated; +import akka.pattern.Patterns; +import org.apache.curator.framework.CuratorFramework; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for the ZooKeeper HA service and {@link JobManager} interaction. + */ +public class ZooKeeperHAJobManagerTest extends TestLogger { + + @ClassRule + public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS); + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() throws Exception { + final Future terminationFuture = system.terminate(); + Await.ready(terminationFuture, TIMEOUT); + } + + /** +* Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses +* leadership. +*/ + @Test + public void testJobGraphReleaseWhenLosingLeadership() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setString(HighAvailabilityOptions.HA_Z
[GitHub] azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore
azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#discussion_r212379451 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.ActorUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.dispatcher.DispatcherHATest; +import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.testingUtils.TestingJobManager; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperResource; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.actor.Identify; +import akka.actor.Terminated; +import akka.pattern.Patterns; +import org.apache.curator.framework.CuratorFramework; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for the ZooKeeper HA service and {@link JobManager} interaction. + */ +public class ZooKeeperHAJobManagerTest extends TestLogger { + + @ClassRule + public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS); + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() throws Exception { + final Future terminationFuture = system.terminate(); + Await.ready(terminationFuture, TIMEOUT); + } + + /** +* Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses +* leadership. +*/ + @Test + public void testJobGraphReleaseWhenLosingLeadership() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath()); + + final TestingHighAvailabilityServices highAvailabil
[GitHub] azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore
azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore URL: https://github.com/apache/flink/pull/6587#discussion_r212321052 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ## @@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception // Everything worked, let's remove a previous checkpoint if necessary. while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) { try { - removeSubsumed(completedCheckpoints.removeFirst()); + final CompletedCheckpoint completedCheckpoint = completedCheckpoints.removeFirst(); Review comment: I would try to move the whole try/catch into one method to deduplicate code with `shutdown()`, e.g.: ``` void tryRemove(Runnable doRemove) { try { // .. doRemove.run(); // completedCheckpoint.discardOnSubsume(); or OnShutdown // .. } catch { // ... } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590453#comment-16590453 ] ASF GitHub Bot commented on FLINK-10136: yanghua commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597#issuecomment-415478057 cc @xccui This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add REPEAT supported in Table API and SQL > - > > Key: FLINK-10136 > URL: https://issues.apache.org/jira/browse/FLINK-10136 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > Oracle : > [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat] > MySql: > https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL
yanghua commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL URL: https://github.com/apache/flink/pull/6597#issuecomment-415478057 cc @xccui This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability
StefanRRichter commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability URL: https://github.com/apache/flink/pull/6604#issuecomment-415476287 Modulo some test failures from NPEs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590443#comment-16590443 ] ASF GitHub Bot commented on FLINK-9061: --- StefanRRichter commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability URL: https://github.com/apache/flink/pull/6604#issuecomment-415476287 Modulo some test failures from NPEs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > Labels: pull-request-available > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590401#comment-16590401 ] ASF GitHub Bot commented on FLINK-10203: art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-415465746 @StephanEwen I try to explain you our case: the new Streaming FileSink ideally suitable for write into parquet files and we was so exited when founded it in the new 1.6 version , because we worked on our Sink implementation in parallel . But as I mention before we use latest Cloudera 5.15 distributive based on Hadoop 2.6 and unfortunately we can't upgrade it to higher version of Hadoop. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-415465746 @StephanEwen I try to explain you our case: the new Streaming FileSink ideally suitable for write into parquet files and we was so exited when founded it in the new 1.6 version , because we worked on our Sink implementation in parallel . But as I mention before we use latest Cloudera 5.15 distributive based on Hadoop 2.6 and unfortunately we can't upgrade it to higher version of Hadoop. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590380#comment-16590380 ] ASF GitHub Bot commented on FLINK-9559: --- hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415459642 @pnowojski Hi, thanks for your review. Yes, support VARCHAR(N) can solve the problem. As a workaround, we can cast the literals to varchar to solve the problem, like: `case 1 when 1 then cast('a' as varchar) when 2 then cast('bcd' as varchar) end`. The behavior in strict SQL standard mode(SQL:2003) returns a CHAR(N) type with blank-padded. However, I test Oracle/SqlServer/Mysql and they all return the value without blank-padded. Also, it seems that there is no way to turn blank-padded on. This change will break implementations in Flink. Do you think we should make it configurable? Thanks, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415459642 @pnowojski Hi, thanks for your review. Yes, support VARCHAR(N) can solve the problem. As a workaround, we can cast the literals to varchar to solve the problem, like: `case 1 when 1 then cast('a' as varchar) when 2 then cast('bcd' as varchar) end`. The behavior in strict SQL standard mode(SQL:2003) returns a CHAR(N) type with blank-padded. However, I test Oracle/SqlServer/Mysql and they all return the value without blank-padded. Also, it seems that there is no way to turn blank-padded on. This change will break implementations in Flink. Do you think we should make it configurable? Thanks, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590366#comment-16590366 ] ASF GitHub Bot commented on FLINK-9061: --- StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability URL: https://github.com/apache/flink/pull/6604#issuecomment-415454197 Okay, updated the implementation. Moved the entropy injection code tho the FS implementation, which is much more clean. PR description updated... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > Labels: pull-request-available > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability
StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability URL: https://github.com/apache/flink/pull/6604#issuecomment-415454197 Okay, updated the implementation. Moved the entropy injection code tho the FS implementation, which is much more clean. PR description updated... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590362#comment-16590362 ] ASF GitHub Bot commented on FLINK-10203: art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-415453555 @StephanEwen Yes it's critical because we need to write Parquet files . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-415453555 @StephanEwen Yes it's critical because we need to write Parquet files . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artsem Semianenka updated FLINK-10203: -- Description: New StreamingFileSink ( introduced in 1.6 Flink version ) use HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have an ability to restore from certain point of file after failure and continue write data. To achieve this recover functionality the HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced only in Hadoop 2.7 . Unfortunately there are a few official Hadoop distributive which latest version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As the result Flinks Hadoop connector can't work with this distributives. Flink declares that supported Hadoop from version 2.4.0 upwards ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) I guess we should emulate the functionality of "truncate" method for older Hadoop versions. was: New StreamingFileSink ( introduced in 1.6 Flink version ) use HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have an ability to restore from certain point of file after failure and continue write data. To achive this recover functionality the HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced only in Hadoop 2.7 . Unfortently there are a few official Hadoop distibutives which latast version still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result Flink can't work with this distibutives. Flink declares that supported Hadoop from version 2.4.0 upwards ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) I guess we should emulate the functionality of "truncate" method for older Hadoop versions. > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achieve this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortunately there are a few official Hadoop distributive which latest > version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As > the result Flinks Hadoop connector can't work with this distributives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artsem Semianenka updated FLINK-10203: -- Description: New StreamingFileSink ( introduced in 1.6 Flink version ) use HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have an ability to restore from certain point of file after failure and continue write data. To achive this recover functionality the HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced only in Hadoop 2.7 . Unfortently there are a few official Hadoop distibutives which latast version still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result Flink can't work with this distibutives. Flink declares that supported Hadoop from version 2.4.0 upwards ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) I guess we should emulate the functionality of "truncate" method for older Hadoop versions. was: New StreamingFileSink ( introduced in 1.6 Flink version ) use HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to have an ability to restore from certain point of file after failure and continue write data. To achive this recover functionality the HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced only in Hadoop 2.7 . Unfortently there are a few official Hadoop distibutives which latast version still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result Flink can't work with this distibutives. Flink declares that supported Hadoop from version 2.4.0 upwards (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions) I guess we should emulate the functionality of "truncate" method for older Hadoop versions. > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achive this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortently there are a few official Hadoop distibutives which latast version > still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the > result Flink can't work with this distibutives. > Flink declares that supported Hadoop from version 2.4.0 upwards > ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions]) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590352#comment-16590352 ] ASF GitHub Bot commented on FLINK-10203: StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-415451715 @art4ul Initially, we wanted to keep the new StreamingFileSink code simple and clean, that's why we decided to only with Hadoop 2.7+ for HDFS and retained the old BucketingSink, to support prior Hadoop versions. Is it a critical issue on your side to not use the BucketingSink? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achive this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortently there are a few official Hadoop distibutives which latast version > still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the > result Flink can't work with this distibutives. > Flink declares that supported Hadoop from version 2.4.0 upwards > (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608#issuecomment-415451715 @art4ul Initially, we wanted to keep the new StreamingFileSink code simple and clean, that's why we decided to only with Hadoop 2.7+ for HDFS and retained the old BucketingSink, to support prior Hadoop versions. Is it a critical issue on your side to not use the BucketingSink? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10198) Set Env object in DBOptions for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-10198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590340#comment-16590340 ] ASF GitHub Bot commented on FLINK-10198: StephanEwen commented on issue #6603: [FLINK-10198][state] Set Env object in DBOptions for RocksDB URL: https://github.com/apache/flink/pull/6603#issuecomment-415448851 This has big implication on how users configure their memory. Previously, there was one buffer pool per state, now there is one buffer pool total. So it definitely breaks configurations (lowering overall memory usage, but impacting performance). Is there a concrete reason to add this now, or maybe at some point when we revisit the overall RocksDB configuration model? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Set Env object in DBOptions for RocksDB > --- > > Key: FLINK-10198 > URL: https://issues.apache.org/jira/browse/FLINK-10198 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > I think we should consider to always set a default environment when we create > the DBOptions. > See https://github.com/facebook/rocksdb/wiki/rocksdb-basics: > *Support for Multiple Embedded Databases in the same process* > A common use-case for RocksDB is that applications inherently partition their > data set into logical partitions or shards. This technique benefits > application load balancing and fast recovery from faults. This means that a > single server process should be able to operate multiple RocksDB databases > simultaneously. This is done via an environment object named Env. Among other > things, a thread pool is associated with an Env. If applications want to > share a common thread pool (for background compactions) among multiple > database instances, then it should use the same Env object for opening those > databases. > Similarly, multiple database instances may share the same block cache. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6603: [FLINK-10198][state] Set Env object in DBOptions for RocksDB
StephanEwen commented on issue #6603: [FLINK-10198][state] Set Env object in DBOptions for RocksDB URL: https://github.com/apache/flink/pull/6603#issuecomment-415448851 This has big implication on how users configure their memory. Previously, there was one buffer pool per state, now there is one buffer pool total. So it definitely breaks configurations (lowering overall memory usage, but impacting performance). Is there a concrete reason to add this now, or maybe at some point when we revisit the overall RocksDB configuration model? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590334#comment-16590334 ] ASF GitHub Bot commented on FLINK-9559: --- pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415443589 At least according to draft of 2008 SQL Standard (paragraph 9.3, section 3) iii) 3) ) the result type in this case should be `CHAR(N)` where N is the maximum of the inputs. If any of the inputs is `VARCHAR(x)`, the result should be also `VARCHAR`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415443589 At least according to draft of 2008 SQL Standard (paragraph 9.3, section 3) iii) 3) ) the result type in this case should be `CHAR(N)` where N is the maximum of the inputs. If any of the inputs is `VARCHAR(x)`, the result should be also `VARCHAR`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590333#comment-16590333 ] ASF GitHub Bot commented on FLINK-9559: --- pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415443589 At least according to draft of 2008 SQL Standard, this PR violates it (paragraph 9.3, section 3) iii) 3) says that the result type in this case should be `CHAR(N)` where N is the maximum of the inputs. If any of the inputs is `VARCHAR(x)`, the result should be also `VARCHAR`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415443589 At least according to draft of 2008 SQL Standard, this PR violates it (paragraph 9.3, section 3) iii) 3) says that the result type in this case should be `CHAR(N)` where N is the maximum of the inputs. If any of the inputs is `VARCHAR(x)`, the result should be also `VARCHAR`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10203: --- Labels: pull-request-available (was: ) > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achive this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortently there are a few official Hadoop distibutives which latast version > still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the > result Flink can't work with this distibutives. > Flink declares that supported Hadoop from version 2.4.0 upwards > (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590284#comment-16590284 ] ASF GitHub Bot commented on FLINK-10203: art4ul opened a new pull request #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608 ## What is the purpose of the change New StreamingFileSink ( introduced in 1.6 Flink version ) use HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to have an ability to restore from certain point of file after failure and continue write data. To achive this recover functionality the HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced only in Hadoop 2.7 . Unfortently there are a few official Hadoop distibutives which latast version still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result Flink can't work with this distibutives. Flink declares that supported Hadoop from version 2.4.0 upwards (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions) I guess we should emulate the functionality of "truncate" method for older Hadoop versions. This is possible fix . I would like to start discussion here. The fix of this issue is vital for us as Hadoop 2.6 users. ## Brief change log - Add new abstraction TruncateManager - Add Implementation for old Hadoop version ( LegacyTruncateManager) - Add Implementation for Hadoop 2.7 and upwards ## Verifying this change This change contains only possible solution but currently without any test coverage. Tests will be added after final consensus about implementation . ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: ( no) ## Documentation - Does this pull request introduce a new feature? (no, but I think it should be documented) - If yes, how is the feature documented? (not documented yet) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > Labels: pull-request-available > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achive this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortently there are a few official Hadoop distibutives which latast version > still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the > result Flink can't work with this distibutives. > Flink declares that supported Hadoop from version 2.4.0 upwards > (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] art4ul opened a new pull request #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
art4ul opened a new pull request #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream URL: https://github.com/apache/flink/pull/6608 ## What is the purpose of the change New StreamingFileSink ( introduced in 1.6 Flink version ) use HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to have an ability to restore from certain point of file after failure and continue write data. To achive this recover functionality the HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced only in Hadoop 2.7 . Unfortently there are a few official Hadoop distibutives which latast version still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result Flink can't work with this distibutives. Flink declares that supported Hadoop from version 2.4.0 upwards (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions) I guess we should emulate the functionality of "truncate" method for older Hadoop versions. This is possible fix . I would like to start discussion here. The fix of this issue is vital for us as Hadoop 2.6 users. ## Brief change log - Add new abstraction TruncateManager - Add Implementation for old Hadoop version ( LegacyTruncateManager) - Add Implementation for Hadoop 2.7 and upwards ## Verifying this change This change contains only possible solution but currently without any test coverage. Tests will be added after final consensus about implementation . ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: ( no) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: ( no) ## Documentation - Does this pull request introduce a new feature? (no, but I think it should be documented) - If yes, how is the feature documented? (not documented yet) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams
[ https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590245#comment-16590245 ] Aljoscha Krettek commented on FLINK-10050: -- I think {{DataStream.join()}} and {{DataStream.coGroup()}} are a bit of a dead end because they don't allow getting any information about what window the result is in, or other meta information about the window that you would get from a {{ProcessWindowFunction}}. I'm interested if you have a use case for this, where you don't need to know what window your result is in. > Support 'allowedLateness' in CoGroupedStreams > - > > Key: FLINK-10050 > URL: https://issues.apache.org/jira/browse/FLINK-10050 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.1, 1.6.0 >Reporter: eugen yushin >Priority: Major > Labels: ready-to-commit, windows > > WindowedStream has a support of 'allowedLateness' feature, while > CoGroupedStreams are not. At the mean time, WindowedStream is an inner part > of CoGroupedStreams and all main functionality (like evictor/trigger/...) is > simply delegated to WindowedStream. > There's no chance to operate with late arriving data from previous steps in > cogroups (and joins). Consider the following flow: > a. read data from source1 -> aggregate data with allowed lateness > b. read data from source2 -> aggregate data with allowed lateness > c. cogroup/join streams a and b, and compare aggregated values > Step c doesn't accept any late data from steps a/b due to lack of > `allowedLateness` API call in CoGroupedStreams.java. > Scope: add method `WithWindow.allowedLateness` to Java API > (flink-streaming-java) and extend scala API (flink-streaming-scala). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR
[ https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590230#comment-16590230 ] ASF GitHub Bot commented on FLINK-9559: --- pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415420332 What does the SQL standard have to say about this? MySQL is not the most standard compliant db out there. Maybe the problem here is that string literals are of `CHAR(N)` type, while we do not support `CHAR` anywhere else? Maybe our literals should also have a type of `VARCHAR(N)`? That would also solve this problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The type of a union of CHAR columns of different lengths should be VARCHAR > -- > > Key: FLINK-9559 > URL: https://issues.apache.org/jira/browse/FLINK-9559 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > Currently, If the case-when expression has two branches which return string > literal, redundant white spaces will be appended to the short string literal. > For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the > return value will be 'a ' of CHAR(3) instead of 'a'. > Although, this follows the behavior in strict SQL standard mode(SQL:2003). We > should get the pragmatic return type in a real scenario without blank-padded. > Happily, this problem has been fixed by > [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can > upgrade calcite to the next release(1.17.0) and override > {{RelDataTypeSystem}} in flink to configure the return type, i.e., making > {{shouldConvertRaggedUnionTypesToVarying()}} return true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR
pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR URL: https://github.com/apache/flink/pull/6519#issuecomment-415420332 What does the SQL standard have to say about this? MySQL is not the most standard compliant db out there. Maybe the problem here is that string literals are of `CHAR(N)` type, while we do not support `CHAR` anywhere else? Maybe our literals should also have a type of `VARCHAR(N)`? That would also solve this problem. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artsem Semianenka updated FLINK-10203: -- Summary: Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream (was: HadoopRecoverableFsDataOutputStream does not support Hadoop 2.6) > Support truncate method for old Hadoop versions in > HadoopRecoverableFsDataOutputStream > -- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achive this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortently there are a few official Hadoop distibutives which latast version > still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the > result Flink can't work with this distibutives. > Flink declares that supported Hadoop from version 2.4.0 upwards > (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10203) HadoopRecoverableFsDataOutputStream does not support Hadoop 2.6
[ https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artsem Semianenka updated FLINK-10203: -- Summary: HadoopRecoverableFsDataOutputStream does not support Hadoop 2.6 (was: HadoopRecoverableWriter does not support Hadoop ) > HadoopRecoverableFsDataOutputStream does not support Hadoop 2.6 > --- > > Key: FLINK-10203 > URL: https://issues.apache.org/jira/browse/FLINK-10203 > Project: Flink > Issue Type: Bug > Components: DataStream API, filesystem-connector >Affects Versions: 1.6.0, 1.6.1, 1.7.0 >Reporter: Artsem Semianenka >Priority: Major > > New StreamingFileSink ( introduced in 1.6 Flink version ) use > HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. > HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to > have an ability to restore from certain point of file after failure and > continue write data. To achive this recover functionality the > HadoopRecoverableFsDataOutputStream use "truncate" method which was > introduced only in Hadoop 2.7 . > Unfortently there are a few official Hadoop distibutives which latast version > still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the > result Flink can't work with this distibutives. > Flink declares that supported Hadoop from version 2.4.0 upwards > (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions) > I guess we should emulate the functionality of "truncate" method for older > Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10203) HadoopRecoverableWriter does not support Hadoop
Artsem Semianenka created FLINK-10203: - Summary: HadoopRecoverableWriter does not support Hadoop Key: FLINK-10203 URL: https://issues.apache.org/jira/browse/FLINK-10203 Project: Flink Issue Type: Bug Components: DataStream API, filesystem-connector Affects Versions: 1.6.0, 1.6.1, 1.7.0 Reporter: Artsem Semianenka New StreamingFileSink ( introduced in 1.6 Flink version ) use HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS. HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to have an ability to restore from certain point of file after failure and continue write data. To achive this recover functionality the HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced only in Hadoop 2.7 . Unfortently there are a few official Hadoop distibutives which latast version still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result Flink can't work with this distibutives. Flink declares that supported Hadoop from version 2.4.0 upwards (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions) I guess we should emulate the functionality of "truncate" method for older Hadoop versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r212309444 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I've pushed the changes as we discussed above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590215#comment-16590215 ] ASF GitHub Bot commented on FLINK-8500: --- FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r212309444 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I've pushed the changes as we discussed above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10198) Set Env object in DBOptions for RocksDB
[ https://issues.apache.org/jira/browse/FLINK-10198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590108#comment-16590108 ] ASF GitHub Bot commented on FLINK-10198: NicoK commented on issue #6603: [FLINK-10198][state] Set Env object in DBOptions for RocksDB URL: https://github.com/apache/flink/pull/6603#issuecomment-415383492 In general, I like this idea, because before we would potentially end up with a log of additional RocksDB threads that might not be needed - they are all probably blocking on the same disk anyway! Did you measure the performance changes in some scenarios (1 RocksDB instances vs. 10 or even more) to verify that we are actually not making things worse with this change? Previously, with (for example) `FLASH_SSD_OPTIMIZED`, every RocksDB instance was using 4 threads and now all together will use 4? -> Should we tweak the default settings for the number of threads, also in the other profiles? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Set Env object in DBOptions for RocksDB > --- > > Key: FLINK-10198 > URL: https://issues.apache.org/jira/browse/FLINK-10198 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.7.0 >Reporter: Stefan Richter >Assignee: Stefan Richter >Priority: Major > Labels: pull-request-available > > I think we should consider to always set a default environment when we create > the DBOptions. > See https://github.com/facebook/rocksdb/wiki/rocksdb-basics: > *Support for Multiple Embedded Databases in the same process* > A common use-case for RocksDB is that applications inherently partition their > data set into logical partitions or shards. This technique benefits > application load balancing and fast recovery from faults. This means that a > single server process should be able to operate multiple RocksDB databases > simultaneously. This is done via an environment object named Env. Among other > things, a thread pool is associated with an Env. If applications want to > share a common thread pool (for background compactions) among multiple > database instances, then it should use the same Env object for opening those > databases. > Similarly, multiple database instances may share the same block cache. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK commented on issue #6603: [FLINK-10198][state] Set Env object in DBOptions for RocksDB
NicoK commented on issue #6603: [FLINK-10198][state] Set Env object in DBOptions for RocksDB URL: https://github.com/apache/flink/pull/6603#issuecomment-415383492 In general, I like this idea, because before we would potentially end up with a log of additional RocksDB threads that might not be needed - they are all probably blocking on the same disk anyway! Did you measure the performance changes in some scenarios (1 RocksDB instances vs. 10 or even more) to verify that we are actually not making things worse with this change? Previously, with (for example) `FLASH_SSD_OPTIMIZED`, every RocksDB instance was using 4 threads and now all together will use 4? -> Should we tweak the default settings for the number of threads, also in the other profiles? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9391) Support UNNEST in Table API
[ https://issues.apache.org/jira/browse/FLINK-9391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590055#comment-16590055 ] Mikhail commented on FLINK-9391: Hello [~twalthr], As I know, [~ipatina] is on another project. So I'd like to continue Alina's work. I already found out that if we want to select with unnest from table: {code:java} @Test def testUnnest(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val data = new mutable.MutableList[(Int, Long, Array[String])] data.+=((1, 1L, Array("Hi", "w"))) data.+=((2, 2L, Array("Hello", "k"))) data.+=((3, 2L, Array("Hello world", "x"))) val input = env.fromCollection(Random.shuffle(data)).toTable(tEnv).as('a, 'b, 'c) val unnested = input.select('a, 'b, 'c.unnest()) val actual = unnested.toDataSet[Row].collect() val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x").mkString("\n") TestBaseUtils.compareResultAsText(actual.asJava, expected) } {code} Then there will be generated code for a function in DataSetCalc#translateToPlan for processing input data. That code will be compiled and executed. That code will be generated using CommonCalc#generateFunction which will process each element in a Row separately. So final output for a Row in a table will contain single Row. But for our case with UNNEST we need to have another flow. [~twalthr], could you please suggest where to look at? > Support UNNEST in Table API > --- > > Key: FLINK-9391 > URL: https://issues.apache.org/jira/browse/FLINK-9391 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Alina Ipatina >Priority: Major > > FLINK-6033 introduced the UNNEST function for SQL. We should also add this > function to the Table API to keep the APIs in sync. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #6607: [docs] Remove the redundant tag
zentol closed pull request #6607: [docs] Remove the redundant tag URL: https://github.com/apache/flink/pull/6607 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index f1afa4e93b2..a93853c14dc 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1223,7 +1223,7 @@ Thus, in order to infer the metric identifier: Histogram - Task + Task numBytesInLocal The total number of bytes this task has read from a local source. Counter @@ -1244,7 +1244,6 @@ Thus, in order to infer the metric identifier: Meter - Task numBuffersInLocal The total number of network buffers this task has read from a local source. Counter This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase
[ https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590035#comment-16590035 ] ASF GitHub Bot commented on FLINK-9713: --- pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r212257550 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableVersionFunction.scala ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import java.sql.Timestamp + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.Table +import org.apache.flink.types.Row + +/** + * Class represnting table version functions over some history table. + */ +class TableVersionFunction private( +@transient private val _table: Table, +private[flink] val versionField: String, +private[flink] val primaryKey: String, +private[flink] val resultType: RowTypeInfo, +private[flink] val isProctime: Boolean) + extends TableFunction[Row] { + + def eval(row: Timestamp): Unit = { Review comment: Without it I'm getting: ``` org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.functions.TemporalTableFunction' does not implement at least one method named 'eval' which is public, not abstract and (in case of table functions) not static. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support versioned joins in planning phase > - > > Key: FLINK-9713 > URL: https://issues.apache.org/jira/browse/FLINK-9713 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Queries like: > {code:java} > SELECT > o.amount * r.rate > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE o.currency = r.currency{code} > should evaluate to valid plan with versioned joins plan node. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins URL: https://github.com/apache/flink/pull/6299#discussion_r212257550 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableVersionFunction.scala ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions + +import java.sql.Timestamp + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.Table +import org.apache.flink.types.Row + +/** + * Class represnting table version functions over some history table. + */ +class TableVersionFunction private( +@transient private val _table: Table, +private[flink] val versionField: String, +private[flink] val primaryKey: String, +private[flink] val resultType: RowTypeInfo, +private[flink] val isProctime: Boolean) + extends TableFunction[Row] { + + def eval(row: Timestamp): Unit = { Review comment: Without it I'm getting: ``` org.apache.flink.table.api.ValidationException: Function class 'org.apache.flink.table.functions.TemporalTableFunction' does not implement at least one method named 'eval' which is public, not abstract and (in case of table functions) not static. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590017#comment-16590017 ] ASF GitHub Bot commented on FLINK-8500: --- tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r212252904 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I would be ok with proceeding with the above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r212252904 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: I would be ok with proceeding with the above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590003#comment-16590003 ] ASF GitHub Bot commented on FLINK-9061: --- StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability URL: https://github.com/apache/flink/pull/6604#issuecomment-415362329 Good points - these were actually problems in the original PR as well... Will try to come up with a solution for that... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Bug > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > Labels: pull-request-available > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability
StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability URL: https://github.com/apache/flink/pull/6604#issuecomment-415362329 Good points - these were actually problems in the original PR as well... Will try to come up with a solution for that... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kisimple opened a new pull request #6607: [docs] Remove the redundant tag
kisimple opened a new pull request #6607: [docs] Remove the redundant tag URL: https://github.com/apache/flink/pull/6607 Remove the redundant `` tag in https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589904#comment-16589904 ] ASF GitHub Bot commented on FLINK-8500: --- aljoscha commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r21385 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: Yes, to unblock this I thing we can go with this approach, basically the schema becomes this: ``` @PublicEvolving public interface KeyedDeserializationSchema extends Serializable, ResultTypeQueryable { @Deprecated default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) { throw new RuntimeException("blammo"); } default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp) { return deserialize(/* call the other method */); } boolean isEndOfStream(T nextElement); } ``` With this, if you have an existing implementation of `KeyedDeserializationSchema` it will continue to work without any changes. If you implement a new one you have to implement one of the methods, otherwise the exception is thrown. And all Flink code only calls the version that takes the timestamp. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: yanxiaobin >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] aljoscha commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer
aljoscha commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer URL: https://github.com/apache/flink/pull/6105#discussion_r21385 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java ## @@ -45,6 +45,22 @@ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; + /** +* Deserializes the byte message. +* +* @param messageKey the key as a byte array (null if no key has been set). +* @param message The message, as a byte array (null if the message was empty or deleted). +* @param partition The partition the message has originated from. +* @param offset the offset of the message in the original source (for example the Kafka offset). +* @param timestamp the timestamp of the consumer record +* @param timestampType The timestamp type, could be NO_TIMESTAMP, CREATE_TIME or INGEST_TIME. +* +* @return The deserialized message as an object (null if the message cannot be deserialized). +*/ + default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp, TimestampType timestampType) throws IOException { Review comment: Yes, to unblock this I thing we can go with this approach, basically the schema becomes this: ``` @PublicEvolving public interface KeyedDeserializationSchema extends Serializable, ResultTypeQueryable { @Deprecated default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) { throw new RuntimeException("blammo"); } default T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset, long timestamp) { return deserialize(/* call the other method */); } boolean isEndOfStream(T nextElement); } ``` With this, if you have an existing implementation of `KeyedDeserializationSchema` it will continue to work without any changes. If you implement a new one you have to implement one of the methods, otherwise the exception is thrown. And all Flink code only calls the version that takes the timestamp. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] buptljy updated FLINK-10202: Description: Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we run a flink job locally, and we're not able to set state.checkpoint.dir for background wrapped cluster, which will cause {code:java} throw new IllegalStateException("CheckpointConfig says to persist periodic " + "checkpoints, but no checkpoint directory has been configured. You can " + "configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); {code} I wonder if we could provide a public method in *StreamExecutionEnvironment* so that developers can use it to set state.checkpoint.dir for job. was: Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we run a flink job locally, and we're not able to set state.checkpoint.dir, which will cause {code:java} throw new IllegalStateException("CheckpointConfig says to persist periodic " + "checkpoints, but no checkpoint directory has been configured. You can " + "configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); {code} I wonder if we could provide a public method in *StreamExecutionEnvironment* so that developers can use it to set state.checkpoint.dir for job. > Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment > --- > > Key: FLINK-10202 > URL: https://issues.apache.org/jira/browse/FLINK-10202 > Project: Flink > Issue Type: Improvement >Reporter: buptljy >Priority: Major > > Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we > run a flink job locally, and we're not able to set state.checkpoint.dir for > background wrapped cluster, which will cause > {code:java} > throw new IllegalStateException("CheckpointConfig says to persist periodic " + > "checkpoints, but no checkpoint directory has been configured. You can > " + > "configure configure one via key '" + > ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); > {code} > I wonder if we could provide a public method in *StreamExecutionEnvironment* > so that developers can use it to set state.checkpoint.dir for job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] buptljy updated FLINK-10202: Summary: Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment (was: Enable configuration for state.checkpoint.dir in environment) > Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment > --- > > Key: FLINK-10202 > URL: https://issues.apache.org/jira/browse/FLINK-10202 > Project: Flink > Issue Type: Improvement >Reporter: buptljy >Priority: Major > > Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we > run a flink job locally, and we're not able to set state.checkpoint.dir, > which will cause > {code:java} > throw new IllegalStateException("CheckpointConfig says to persist periodic " + > "checkpoints, but no checkpoint directory has been configured. You can > " + > "configure configure one via key '" + > ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); > {code} > I wonder if we could provide a public method in *StreamExecutionEnvironment* > so that developers can use it to set state.checkpoint.dir for job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir with environment
[ https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] buptljy updated FLINK-10202: Description: Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we run a flink job locally, and we're not able to set state.checkpoint.dir, which will cause {code:java} throw new IllegalStateException("CheckpointConfig says to persist periodic " + "checkpoints, but no checkpoint directory has been configured. You can " + "configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); {code} I wonder if we could provide a public method in *StreamExecutionEnvironment* so that developers can use it to set state.checkpoint.dir for job. was: Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we run a flink job locally, and we're not able to set state.checkpoint.dir, which will cause {code:java} throw new IllegalStateException("CheckpointConfig says to persist periodic " + "checkpoints, but no checkpoint directory has been configured. You can " + "configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); {code} > Enable configuration for state.checkpoint.dir with environment > -- > > Key: FLINK-10202 > URL: https://issues.apache.org/jira/browse/FLINK-10202 > Project: Flink > Issue Type: Improvement >Reporter: buptljy >Priority: Major > > Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we > run a flink job locally, and we're not able to set state.checkpoint.dir, > which will cause > {code:java} > throw new IllegalStateException("CheckpointConfig says to persist periodic " + > "checkpoints, but no checkpoint directory has been configured. You can > " + > "configure configure one via key '" + > ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); > {code} > I wonder if we could provide a public method in *StreamExecutionEnvironment* > so that developers can use it to set state.checkpoint.dir for job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir in environment
[ https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] buptljy updated FLINK-10202: Summary: Enable configuration for state.checkpoint.dir in environment (was: Enable configuration for state.checkpoint.dir with environment) > Enable configuration for state.checkpoint.dir in environment > > > Key: FLINK-10202 > URL: https://issues.apache.org/jira/browse/FLINK-10202 > Project: Flink > Issue Type: Improvement >Reporter: buptljy >Priority: Major > > Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we > run a flink job locally, and we're not able to set state.checkpoint.dir, > which will cause > {code:java} > throw new IllegalStateException("CheckpointConfig says to persist periodic " + > "checkpoints, but no checkpoint directory has been configured. You can > " + > "configure configure one via key '" + > ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); > {code} > I wonder if we could provide a public method in *StreamExecutionEnvironment* > so that developers can use it to set state.checkpoint.dir for job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10202) Enable configuration for state.checkpoint.dir with environment
buptljy created FLINK-10202: --- Summary: Enable configuration for state.checkpoint.dir with environment Key: FLINK-10202 URL: https://issues.apache.org/jira/browse/FLINK-10202 Project: Flink Issue Type: Improvement Reporter: buptljy Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we run a flink job locally, and we're not able to set state.checkpoint.dir, which will cause {code:java} throw new IllegalStateException("CheckpointConfig says to persist periodic " + "checkpoints, but no checkpoint directory has been configured. You can " + "configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints
[ https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589897#comment-16589897 ] ASF GitHub Bot commented on FLINK-9693: --- TisonKun commented on issue #6251: [FLINK-9693] Set Execution#taskRestore to null after deployment URL: https://github.com/apache/flink/pull/6251#issuecomment-415333090 Thanks till, this saves my weekend :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Possible memory leak in jobmanager retaining archived checkpoints > - > > Key: FLINK-9693 > URL: https://issues.apache.org/jira/browse/FLINK-9693 > Project: Flink > Issue Type: Bug > Components: JobManager, State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0 > Environment: !image.png!!image (1).png! >Reporter: Steven Zhen Wu >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.4.3, 1.5.3, 1.6.1, 1.7.0 > > Attachments: 20180725_jm_mem_leak.png, > 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png > > > First, some context about the job > * Flink 1.4.1 > * stand-alone deployment mode > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes to > S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > Summary of the observations > * 41,567 ExecutionVertex objects retained 9+ GB of memory > * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6251: [FLINK-9693] Set Execution#taskRestore to null after deployment
TisonKun commented on issue #6251: [FLINK-9693] Set Execution#taskRestore to null after deployment URL: https://github.com/apache/flink/pull/6251#issuecomment-415333090 Thanks till, this saves my weekend :-) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services