[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188349853 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); --- End diff -- Empty line / indentation ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188321914 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema + extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param urlurl of schema registry to connect +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric( + Schema schema, String url) { + return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param url url of schema registry to connect +* @param identityMapCapacity maximum number of cached schema versions (default: 1000) +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric( + Schema schema, String url, int identityMapCapacity) { + return new ConfluentRegistryAvroDeserializationSchema<>( + GenericRecord.class, + schema, + new CachedSchemaCoderProvider(url, identityMapCapacity)); + } + + /** +* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro +* schema and looks up writer schema in Confluent Schema Registry. +* +* @param tClass class of record to be produced +* @param urlurl of schema registry to connect +* @return deserialized record
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188355756 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -275,9 +304,19 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { // Utilities // + private static boolean isGenericRecord(Class type) { + return !SpecificRecord.class.isAssignableFrom(type) && + GenericRecord.class.isAssignableFrom(type); + } + @Override public TypeSerializer duplicate() { - return new AvroSerializer<>(type); + if (schemaString != null) { + return new AvroSerializer<>(type, new Schema.Parser().parse(schemaString)); --- End diff -- Duplication happens frequently, would be good to avoid schema parsing. You can add a private copy constructor that takes class and string. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188355390 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -99,9 +108,29 @@ /** * Creates a new AvroSerializer for the type indicated by the given class. +* This constructor is intended to be used with {@link SpecificRecord} or reflection serializer. +* For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)} */ public AvroSerializer(Class type) { + Preconditions.checkArgument(!isGenericRecord(type), --- End diff -- Minor: Other preconditions checks in this class are done by statically imported methods. While this is not consistent within the code base, I would suggest to keep this consistent within a class as much as possible. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188348636 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* --- End diff -- That would mean initializing it all lazily, in `getDatumReader()` or in a `checkAvroInitialized()` method. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188328437 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* --- End diff -- Double Apache header ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188337378 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { --- End diff -- This should have a serial version UID. You can activate the respective inspections in IntelliJ to warn about such issues. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188336725 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; --- End diff -- All fields should be final whenever possible - immutability as the default choice. That acts both as documentation about the writer's intention and makes it future proof. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188350196 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); + this.schemaCoderProvider = schemaCoderProvider; + this.schemaCoder = schemaCoderProvider.get(); + } + + @Override + public T deserialize(byte[] message) { + // read record + try { + getInputStream().setBuffer(message); + Schema writerSchema = schemaCoder.readSchema(getInputStream()); + Schema readerSchema = getReaderSchema(); + + GenericDatumReader datumReader = getDatumReader(); + + datumReader.setSchema(writerSchema); + datumReader.setExpected(readerSchema); + + return datumReader.read(null, getDecoder()); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + } + + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { --- End diff -- See above, would suggest to avoid readObject() ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188353074 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -79,15 +87,16 @@ // runtime fields, non-serializable, lazily initialized --- - private transient SpecificDatumWriter writer; - private transient SpecificDatumReader reader; + private transient GenericDatumWriter writer; + private transient GenericDatumReader reader; private transient DataOutputEncoder encoder; private transient DataInputDecoder decoder; - private transient SpecificData avroData; + private transient GenericData avroData; private transient Schema schema; + private final String schemaString; --- End diff -- As per the comments, the existing code orders config fields before runtime fields. Can you place the schema to match that pattern? ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188328926 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; --- End diff -- I would avoid null initialization, it is redundant. It actually does nothing (fields are null anyways) but acually exists as byte code, hence costs cpu cycles. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188349785 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; --- End diff -- We typically do empty lines between class declaration and members. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188338897 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; --- End diff -- You can make this variable final as well. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188347289 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* --- End diff -- This class uses a mixture of eager initialization of transient members (in readObject()) and lazy initialization (in getDatumReader()). I would suggest to do it all one way or the other. My suggestion would be to avoid `readObject()` whenever possible. If you encounter an exception during schema parsing (and it may be something weird from Jackson, like a missing manifest due to a shading issue), you will get the most unhelpful exception stack trace ever, in the weirdest place (like Flink's RPC message decoder). In my experience, when a user sees such a stack trace, they are rarely able to diagnose that. Best case they show up on the mailing list, worst case they give up. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188319278 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema + extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); --- End diff -- For such situations, code in the method and parameter list should use different indentation, or be separated by an empty line. Otherwise makes it hard to parse. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188349118 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + t
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188329273 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** --- End diff -- Minor comment: Many newer classes pick up a style where the JavaDocs of fields are in one line, to make the fields section a bit more compact: ``` /** Class to deserialize to. */ private Class recordClazz; ``` ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188319368 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema + extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param urlurl of schema registry to connect +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric( + Schema schema, String url) { + return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY); --- End diff -- Same as above ---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 My gut feeling is that we don't need `WriteMode.OVERWRITE` in cases where one wants such an atomic file creation... ---
[GitHub] flink issue #6016: [FLINK-8933] Avoid calling Class#newInstance(part 2)
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6016 I am not yet convinced that this fixes an actual problem, whereas it is not clear whether it may cause regression (exception traces or performance). To me this is a case to not do this change, but spend effort on other issues. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188316052 --- Diff: flink-formats/flink-avro-confluent-registry/pom.xml --- @@ -0,0 +1,94 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + flink-formats + org.apache.flink + 1.6-SNAPSHOT + + 4.0.0 + + flink-avro-confluent-registry --- End diff -- That is a good question - maybe eventually. We could leave it in `flink-formats` for now until we have a case to create `flink-catalogs`. This is also not a full-blown catalog support, as for the Table API, but something much simpler - just multiple Avro Schemas. ---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 Good point about the renaming on `close()` in case close is called for cleanup, rather than success. We could follow the same semantics as in [CheckpointStateOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java#L61) There the semantics are: - `close()` means "close on error / cleanup" and closes the stream and deletes the temp file. - `closeAndPublish()` would mean "close on success" and close the stream and rename the file. - After ``closeAndPublish()` has been called, `close()` becomes a no-op. The [FsCheckpointMetadataOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java) implements that pattern, I think it worked well and is easy to use. ---
[GitHub] flink issue #6016: [FLINK-8933] Avoid calling Class#newInstance(part 2)
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6016 I am not sure we should be doing it like this. The point of discouraging `Class.newInstance()` is not to copy replace this by a different call. That will not magically make anything better. The whole point is to go from a method that may sneakily throw undeclared exceptions, to a method that declares exceptions. All places that do not require a change to the try/catch statement (add `InvokationTargetException`) have reasonable exception handling in place already, and are not vulnerable to the motivating issue. In fact, for now this makes thing worse because Exceptions get wrapped into `InvokationTargetException`, with the root cause in the `getTargetException()`, not in the exception chain. That would actually be a serious regression. As a more general comment: Copy/replace style pull requests are a bit tricky, in my opinion - they are rather easy to create (bulk fix tool) and require committers to put up significant time to dig into the matter and to double check that it actually does not cause a regression in the individual spots that are changed. One can probably keep all committers busy for a year with just such fixes without really changing anything for users for the better, and having a high risk of regressions. I would personally vote to not have many of those pull requests, or clearly label them as "cosmetic" and give them a very low priority in reviewing. Especially given that we have a decent backlog of pull requests for changes that where users will see a difference. ---
[GitHub] flink issue #6002: [FLINK-9350] Parameter baseInterval has wrong check messa...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6002 Good fix, thanks. +1 to merge ---
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5995 Thanks, the main code looks good! Unfortunately, this seems to wither break the compatibility with prior savepoints (when Avro types were implicitly handled through Kryo, now bridged through the `BackwarsCompatibleAvroSerializer`) or needs to adjust that test. There are also some license header issues, causing the build to fail. ---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 Thanks for preparing this. I looked at the `TwoPhraseFSDatautputStream` - maybe we can make this simpler. Do we need the distinction between phases? Is it not enough to behave as a regular stream, just overriding `close()` to do `super.close()` + `rename()`? That may be enough. When the stream is closed, all the writing methods anyways fail with a "stream closed exception". Also, we need this method to be implemented in all FileSystem subclasses. Typos: "Phrase" --> "Phase" ---
[GitHub] flink issue #5963: [FLINK-9305][s3] also register flink-s3-fs-hadoop's facto...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5963 Looks good, thanks. +1 to merge ---
[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5897 Agreed, let's add it to master as well... ---
[GitHub] flink issue #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5970 Looks good, thanks. +1 to merge ---
[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5834 Committers usually have a lot of different responsibilities (releases, testing, helping users on mailing lists, working on roadmap features, etc.). All that takes a lot of time. Reviewing PRs is one important part, and we try to do this well, but with so many users now, it is not always perfect. One big problem is that very few committers actually take the time to look at external contributions. I might help to not always ping the same people (for example @zentol , @tillrohrmann , me, etc.) but some other committers as well. Here is a list of other committers, it is not quite complete, some newer ones are not yet listed: http://flink.apache.org/community.html#people Hope that helps you understand... ---
[GitHub] flink issue #6000: [FLINK-9299] [Documents] ProcessWindowFunction documentat...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6000 Congrats on having PR number 6000! This overlaps with #6001, which is a mit more comprehensive (but need some improvements). Would you be up to coordinate to make one joint PR? ---
[GitHub] flink pull request #6000: [FLINK-9299] [Documents] ProcessWindowFunction doc...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6000#discussion_r187798729 --- Diff: docs/dev/stream/operators/windows.md --- @@ -730,9 +730,9 @@ input /* ... */ -public class MyProcessWindowFunction implements ProcessWindowFunction, String, String, TimeWindow> { +public static class MyProcessWindowFunction extends ProcessWindowFunction, String, String, TimeWindow> { --- End diff -- In other code samples, we don't put the `static`, not assuming that this is defined as an inner class ---
[GitHub] flink issue #5999: [FLINK-9348] [Documentation] scalastyle documentation for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5999 This is helpful, thanks. Merging... ---
[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5966 I agree, we need different key/truststores for the internal/external connectivity. This PR was meant as a step in that direction, separating at least within the SSL Utils the internal and external context setup. In your thinking, is there ever a case for a different internal authentication method than "single trusted certificate"? What if were not tied to akka? (Side note: I think for internal communication, 'authentication is authorization' is probably reasonable, because the are no different users/roles for internal communication). Would you assume that internally, we never do hostname verification? ---
[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5970#discussion_r187797400 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java --- @@ -62,6 +62,10 @@ @SuppressWarnings("serial") public class UdfAnalyzerTest { + private static TypeInformation> stringIntTuple2TypeInfo = TypeInformation.of(new TypeHint>(){}); --- End diff -- These fields are constants, so they should be final. Can you add the modified and rename the fields to match the naming convention? ---
[GitHub] flink issue #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5970 One minor style comment, otherwise this is good to go! ---
[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5897 From my side, +1 to merge this to `release-1.4` and `release-1.5`. ---
[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5897 Forwarding some comment from @fhueske from JIRA: > I tried to reproduce this issue for 1.5 but it seems to work. > > Flink 1.5 should be out soon (release candidate 2 was published two days ago). We can merge a fix for 1.4, but would need to wait for 1.4.3 to be released before it is publicly available (unless you build from the latest 1.4 branch). > > `commons-configuration` is used for the external catalog support that was recently reworked for the unified table source generation. The code that needs the dependency was deprecated. I think we can drop the code and dependency for the 1.6 release. That means we should merge this into `release-1.4` and `release-1.5`. In `master`, we could merge this, but should probably simply drop the pre-unified-source code. ---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 How about adding the method `createAtomically` or so, with otherwise the same signature as the `create(Path, WriteMode)` method? ---
[GitHub] flink issue #5735: [FLINK-9036] [core] Add default values to State Descripto...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5735 @aljoscha Would be interested in your opinion here. This is basically one of two ways to improve the handling of default values: 1. Add a default value supplier on the state descriptor (this approach). Advantage is that you can use this to backwards compatibly handle the previous cases of default values (including the starting value for folding state) 2. Add a `T getOrDefault(Supplier)` method to `ValueState`. This might me almost simpler to do even, and more flexible as it allows for different default values in different contexts. This can get inefficient though when users naively create an anonymous class for the supplier (probably not a big deal any more since lambdas) and it breaks with the current approach, meaning we two different ways for default values that need to work together, one of which is deprecated, but still needs to be supported until Flink 2.0 ---
[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5977#discussion_r187639708 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -837,7 +837,7 @@ public void initializeState(FunctionInitializationContext context) throws Except nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState( NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); transactionalIdsGenerator = new TransactionalIdsGenerator( - getRuntimeContext().getTaskName(), + getRuntimeContext().getTaskName() + "-" + getRuntimeContext().getOperatorUniqueID(), --- End diff -- You could probably use only the Operator ID here - the task name does not add to the uniqueness. Unless the purpose of the task name is "human readability" in log files or metrics. ---
[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5977#discussion_r187640541 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java --- @@ -71,6 +71,17 @@ @PublicEvolving MetricGroup getMetricGroup(); + /** +* Returned value is guaranteed to be unique between operators within the same job and to be +* stable and the same across job submissions. +* +* This operation is currently only supported in Streaming (DataStream) contexts. +* +* @return String representation of the operator's unique id. +*/ + @PublicEvolving + String getOperatorUniqueID(); --- End diff -- Rather than adding this here and failing for *DataSet* programs, hoe about adding this to `StreamingRuntimeContext` and casting inside the Kafka Producer? Not super pretty, but nicer than having something that looks like a pretty generic concept (operator id) throwing an exception in a whole class of programs (batch jobs). This problem should go away anyways with the batch / streaming unification later. ---
[GitHub] flink issue #5973: [FLINK-9261][ssl] Fix SSL support for REST API and Web UI...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5973 Merged in `master` in 3afa5eb3c47158086ab29012a835f96682a85d34 ---
[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5982 I think this fix here might not work for S3, because a rename() with the S3 file systems will actually trigger a copy (or even a download and upload), so it is not a cheap operation. The we can fix this by adding a `create(...)` method (or mode) to the FileSystem API that does not publish data in the file until `close()` is called. For hdfs://, file://, this would be using a temp file with renaming, for S3 we don't write to a temp file, because S3 makes the file only visible on close() anyways. ---
[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5970#discussion_r187610790 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java --- @@ -109,7 +112,7 @@ public String map(MyPojo value) throws Exception { @Test public void testForwardWithGenericTypePublicAttrAccess() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String"); + TypeInformation.of(new TypeHint>(){}), Types.STRING); --- End diff -- Then the code needs to be `new GenericTypeInfo<>(MyPojo.class)`. Otherwise it will try and determine why the Type Info for `GenericType` is and it just happens to be a generic type ;-) ---
[GitHub] flink issue #5962: [FLINK-9304] Timer service shutdown should not stop if in...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5962 The comment to make at least one attempt sounds good. If you refactor this a little bit, you can actually test it. Either make it a static method to which you pass the timer service, or better, make it a method `shutdownUninterruptible(Deadline, Logger)` on the TimerService itself. ---
[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5970#discussion_r187591783 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java --- @@ -109,7 +112,7 @@ public String map(MyPojo value) throws Exception { @Test public void testForwardWithGenericTypePublicAttrAccess() { compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class, - "org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String"); + TypeInformation.of(new TypeHint>(){}), Types.STRING); --- End diff -- The `GenericTypeInfo` here seems to be not correct. ---
[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5970#discussion_r187589977 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeInformationTest.java --- @@ -61,7 +61,7 @@ public void testOfGenericClassForGenericType() { @Test public void testOfTypeHint() { assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(String.class)); - assertEquals(BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(new TypeHint(){})); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, Types.STRING); --- End diff -- Please undo this one, the test explicitly tests the TypeHint use. ---
[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5970#discussion_r187592909 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java --- @@ -268,8 +274,8 @@ public void testForwardIntoNestedTupleWithVarAndModification() { @Test public void testForwardIntoTupleWithAssignment() { - compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, "Tuple2", - "Tuple2"); + compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, + TypeInformation.of(new TypeHint>(){}), TypeInformation.of(new TypeHint>(){})); --- End diff -- There is so much use of the types `Tuple2>` and `Tuple2`, it would make sense to factor these out into a static field and reference them from there. ---
[GitHub] flink issue #5987: [FLINK-9043][CLI]Automatically search for the last succes...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5987 Sorry, but can we take back a step and first agree on what we actually want the behavior to be before jumping into an implementation? ---
[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5958 If it turns out that we need to do a bit more design work on the deserialization schema, we could incrementally fix the issue that triggered this PR by actually extending the KeyedDeserializationSchema with a new default method. ---
[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5958 There are a few bigger design aspects that we need to agree upon: - The `DeserializationSchema` is a shared common denominator of serialization schemata. That's why it is in `flink-core` and not in a Kafka connector project. It is used by various per-record streaming sources, like Kafka, RabbitMQ, in the future PubSub, or AMQ. It may be valuable to migrate Kinesis also to that. This PR changes the common denominator to have very Kafka-specific fields. - The common schema makes sense because we can offer a library of implementations, like for Avro, Json, Thrift, Protobuf. All connectors can use hardened implementations for these types, or integrations with schema registries. - This surfaces for example in the SQL / Table API, which is currently making an effort to change their source API to have "connector" and "format" aspects orthogonal. You define a table as "from kafka with Avro", or "from row-wise file with JSON", etc. - We should think of having something similar in the future in the unified batch/streaming DataStream API as well, when we rework our source interface. At least a "row-wise source" that can then use all these format implementations. That means we are in a bit of a conflict between "common deserialization schema" interface and surfacing connector specific information. One way to approach that might be making the connector-specific deserializer classes subclasses of the common one, and let them use specialized subclasses of ConsumerRecordMetaInfo that have the additional fields. On a separate note, I think that `ConsumerRecordMetaInfo` is not the best name, because the type has not only the meta info, but the actual record. So we could call it `Record` or `Datum` or `SourceRecord`, etc. ---
[GitHub] flink issue #5973: [FLINK-9261][ssl] Fix SSL support for REST API and Web UI...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5973 Merging this... ---
[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5980#discussion_r187191338 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java --- @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() { * the logical slot. * * @param cause of the payload release -* @return true if the logical slot's payload could be released, otherwise false */ @Override - public boolean release(Throwable cause) { - return releaseSlot(cause).isDone(); + public void release(Throwable cause) { + if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) { + signalPayloadRelease(cause); + } + state = State.RELEASED; + releaseFuture.complete(null); + } + + private CompletableFuture signalPayloadRelease(Throwable cause) { + tryAssignPayload(TERMINATED_PAYLOAD); + payload.fail(cause); + + return payload.getTerminalStateFuture(); + } + + private void returnSlotToOwner(CompletableFuture terminalStateFuture) { + final CompletableFuture slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> { + if (state == State.RELEASING) { --- End diff -- What happens if this gets set concurrently to `RELEASED`? This would only work if `slotOwner.returnAllocatedSlot(this)` works in both cases (releasing and released) and the second path (returning the completed future) is the optimization/fast path if it is already released. (double checking the assumption). ---
[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5980#discussion_r187189887 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java --- @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() { * the logical slot. * * @param cause of the payload release -* @return true if the logical slot's payload could be released, otherwise false */ @Override - public boolean release(Throwable cause) { - return releaseSlot(cause).isDone(); + public void release(Throwable cause) { + if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) { + signalPayloadRelease(cause); + } + state = State.RELEASED; + releaseFuture.complete(null); + } + + private CompletableFuture signalPayloadRelease(Throwable cause) { + tryAssignPayload(TERMINATED_PAYLOAD); + payload.fail(cause); + + return payload.getTerminalStateFuture(); + } + + private void returnSlotToOwner(CompletableFuture terminalStateFuture) { + final CompletableFuture slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> { + if (state == State.RELEASING) { + return slotOwner.returnAllocatedSlot(this); + } else { + return CompletableFuture.completedFuture(true); + } + }).thenCompose(Function.identity()); + + slotReturnFuture.whenComplete( + (Object ignored, Throwable throwable) -> { + state = State.RELEASED; --- End diff -- Mutating a private member field from within a lambda - would it make sense to have a (package) private method `markReleased()` or so? ---
[GitHub] flink issue #5978: [FLINK-8554] Upgrade AWS SDK
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5978 Before merging this, we need to - diff the dependency trees between the versions - clear all licenses for the dependency changes. put changes into the NOTICE files - check that no unshaded classes are added to the jar ---
[GitHub] flink issue #5975: [FLINK-9138][docs][tests] Make ConfigOptionsDocsCompleten...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5975 +1 ---
[GitHub] flink pull request #5973: [FLINK-9261][ssl] Fix SSL support for REST API and...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5973#discussion_r187025268 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java --- @@ -81,16 +85,62 @@ public static void setSSLVerAndCipherSuites(ServerSocket socket, Configuration c } } + /** +* Creates a {@link SSLEngineFactory} to be used by the Server. +* +* @param config The application configuration. +*/ + public static SSLEngineFactory createServerSSLEngineFactory(final Configuration config) throws Exception { + return createSSLEngineFactory(config, false); + } + + /** +* Creates a {@link SSLEngineFactory} to be used by the Client. +* @param config The application configuration. +*/ + public static SSLEngineFactory createClientSSLEngineFactory(final Configuration config) throws Exception { + return createSSLEngineFactory(config, true); + } + + private static SSLEngineFactory createSSLEngineFactory( + final Configuration config, + final boolean clientMode) throws Exception { + + final SSLContext sslContext = clientMode ? + createSSLClientContext(config) : + createSSLServerContext(config); + + checkState(sslContext != null, "%s it not enabled", SecurityOptions.SSL_ENABLED.key()); + + return new SSLEngineFactory( + sslContext, + getEnabledProtocols(config), + getEnabledCipherSuites(config), + clientMode); + } + /** * Sets SSL version and cipher suites for SSLEngine. -* @param engine -*SSLEngine to be handled -* @param config -*The application configuration +* +* @param engine SSLEngine to be handled +* @param config The application configuration +* @deprecated Use {@link #createClientSSLEngineFactory(Configuration)} or +* {@link #createServerSSLEngineFactory(Configuration)}. */ + @Deprecated public static void setSSLVerAndCipherSuites(SSLEngine engine, Configuration config) { - engine.setEnabledProtocols(config.getString(SecurityOptions.SSL_PROTOCOL).split(",")); - engine.setEnabledCipherSuites(config.getString(SecurityOptions.SSL_ALGORITHMS).split(",")); + engine.setEnabledProtocols(getEnabledProtocols(config)); + engine.setEnabledCipherSuites(getEnabledCipherSuites(config)); + } + + private static String[] getEnabledProtocols(final Configuration config) { + requireNonNull(config, "config must not be null"); --- End diff -- For private internal utilities, I suggest to skip the null check in most places, especially when it will eagerly fail with an exception on null anyways. In any case, if you believe the check should be there, please use `Preconditions.checkNotNull` rather than `requireNonNull`. ---
[GitHub] flink issue #5972: [FLINK-9323][build] Properly organize checkstyle-plugin c...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5972 +1 to the cleanup, but I think this might be breaking relative paths of the suppression files, hence causing the build to fail. Probably need to make the paths relative to "root dir" or some "highest project dir" variable. ---
[GitHub] flink issue #5936: [FLINK-9265] Upgrade Prometheus version
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5936 Looks good, thanks. +1 to merge this ---
[GitHub] flink pull request #5966: [FLINK-9312] [security] Add mutual authentication ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5966#discussion_r186800971 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java --- @@ -170,8 +171,8 @@ public void initChannel(SocketChannel channel) throws Exception { localAddress = (InetSocketAddress) bindFuture.channel().localAddress(); - long end = System.currentTimeMillis(); - LOG.info("Successful initialization (took {} ms). Listening on SocketAddress {}.", (end - start), bindFuture.channel().localAddress().toString()); + final long duration = (start - System.nanoTime()) / 1_000_000; + LOG.info("Successful initialization (took {} ms). Listening on SocketAddress {}.", duration, localAddress); --- End diff -- You are absolutely right, thanks for catching this! ---
[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5966 @EronWright This might be interesting to you as well ---
[GitHub] flink pull request #5966: [FLINK-9312] [security] Add mutual authentication ...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5966 [FLINK-9312] [security] Add mutual authentication for RPC and data plane ## What is the purpose of the change Currently, the Flink processes encrypted connections via SSL: - Data exchange TM - TM - RPC JM - TM - Blob Service JM - TM - (Optionally to ZooKeeper and connectors, this is connector specific and not in scope of this change) However, the server side always accepts any client to build up the connection, meaning the connections are not strongly authenticated. Activating SSL mutual authentication strengthens this significantly - only processes that have access to the same certificate can connect. ## Brief change log - Activate mutual auth in akka (via akka config) - Activate mutual auth in Netty for data shuffles via `SSLContext` and `SSLEngine` parameters ## Verifying this change - Adds a test to the `NettyClientServerSslTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink mutual_auth Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5966.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5966 commit 8bceb03d5653c94247b72d6256f4e9e37b036e35 Author: Stephan Ewen Date: 2018-05-07T17:44:33Z [FLINK-9313] [security] Activate mutual authentication for RPC/akka commit 59b017580d30904418e0867ac122a8183dc5db70 Author: Stephan Ewen Date: 2018-05-07T19:28:41Z [FLINK-9314] [security] Add mutual authentication for Netty / TaskManager's data plane ---
[GitHub] flink issue #5965: [FLINK-9310] [security] Update standard cipher suites for...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5965 @EronWright This might be interesting to you. ---
[GitHub] flink pull request #5965: [FLINK-9310] [security] Update standard cipher sui...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5965 [FLINK-9310] [security] Update standard cipher suites for secure mode ## What is the purpose of the change This sets the cipher suits accepted by default to those recommended in IETF RFC 7525 : https://tools.ietf.org/html/rfc7525 ## Brief change log Updates the default value of the respective config option to ``` TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 ``` ## Verifying this change This change is already covered by the existing tests that test SSL setups. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink update_cipher_suits Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5965.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5965 commit 9b24574cd437ddbc2d3546c1fa0f73e983c02e31 Author: Stephan Ewen Date: 2018-05-07T17:47:00Z [FLINK-9310] [security] Update standard cipher suites for secure mode This sets the cipher suits accepted by default to those recommended in IETF RFC 7525 : https://tools.ietf.org/html/rfc7525 ---
[GitHub] flink issue #5936: [FLINK-9265] Upgrade Prometheus version
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5936 Yes, before giving +1 to this commit, we need to check that this introduces no new transitive dependency, or need to make sure that dependency is not an issue. ---
[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5427 Thanks a lot, looks good, can merge this now. One quick question: You decided to have empty default implementations for the new methods in the master hook interface. Given that Pravega is currently the only known user of that interface, I would be okay with breaking the interface (no default methods) if you think that would be cleaner. ---
[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5954#discussion_r186376326 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java --- @@ -296,7 +296,7 @@ public TaskManagerLocation addTaskManager(int numberSlots) { @Override public void releaseTaskManager(ResourceID resourceId) { try { - slotPool.releaseTaskManager(resourceId).get(); + slotPool.releaseTaskManager(resourceId, null).get(); --- End diff -- I would avoid null exceptions. ---
[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5954#discussion_r186376122 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -1070,13 +1071,22 @@ protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) { removePendingRequest(slotRequestId); } - private void releaseTaskManagerInternal(final ResourceID resourceId) { - final FlinkException cause = new FlinkException("Releasing TaskManager " + resourceId + '.'); + private void releaseTaskManagerInternal(final ResourceID resourceId, final Exception cause) { --- End diff -- I think we can just use the `cause` exception and do not need to re-wrap it into another exception. If re-wrapping is needed for some reason, we need to add the original exception as the cause. ---
[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5954#discussion_r186376182 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java --- @@ -86,9 +86,10 @@ * Releases a TaskExecutor with the given {@link ResourceID} from the {@link SlotPool}. * * @param resourceId identifying the TaskExecutor which shall be released from the SlotPool +* @param cause for the release the TaskManager * @return Future acknowledge which is completed after the TaskExecutor has been released */ - CompletableFuture releaseTaskManager(final ResourceID resourceId); + CompletableFuture releaseTaskManager(final ResourceID resourceId, final Exception cause); --- End diff -- Throwable, see above. ---
[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5954#discussion_r186375693 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -212,7 +212,7 @@ public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws E // release all registered slots by releasing the corresponding TaskExecutors for (ResourceID taskManagerResourceId : registeredTaskManagers) { - releaseTaskManagerInternal(taskManagerResourceId); + releaseTaskManagerInternal(taskManagerResourceId, null); --- End diff -- I would try to avoid null here. It is still helpful to have a message that says "SlotPool is shutting down". ---
[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5954#discussion_r186375841 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -1050,11 +1050,12 @@ else if (availableSlots.tryRemove(allocationID)) { * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore. * * @param resourceId The id of the TaskManager +* @param cause for the release the TaskManager */ @Override - public CompletableFuture releaseTaskManager(final ResourceID resourceId) { + public CompletableFuture releaseTaskManager(final ResourceID resourceId, final Exception cause) { --- End diff -- I would use `Throwable` in the signatures. It may always be that some Error is the cause (class not found, etc.) ---
[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5931 @sihuazhou and @shuai-xu thank you for your help in understanding the bug here. Let me rephrase it to make sure I understand the problem exactly. The steps are the following: 1. JobMaster / SlotPool requests a slot (AllocationID) from the ResourceManager 2. ResourceManager starts a container with a TaskManager 3. TaskManager registers at ResourceManager, which tells the TaskManager to push a slot to the JobManager. 4. TaskManager container is killed 5. The ResourceManager does not queue back the slot requests (AllocationIDs) that it sent to the previous TaskManager, so the requests are lost and need to time out before another attempt is tried. Some thoughts on how to deal with this: - It seems the ResourceManager should put the slots from the TaskManager that was failed back to "pending" so they are given to the next TaskManager that starts. - I assume that is not happening, because there is concern that the failure is also detected on the JobManager/SlotPool and retried there and there are double re-tries - The solution would be to better define the protocol with respect to who is responsible for what retries. Two ideas on how to fix that: 1. The ResourceManager notifies the SlotPool that a certain set of AllocationIDs has failed, and the SlotPool directly retries the allocations, resulting in directly starting new containers. 2. The ResourceManager always retries allocations for AllocationIDs it knows. The SlotPool would not retry, it would keep the same allocations always unless they are released as unneeded. We would probably need something to make sure that the SlotPool can distinguish from different offers of the same AllocationID (in case the ResourceManager assumes a timeout but a request goes actually through) - possibly something like an attempt-counter (higher wins). @tillrohrmann also interested in your thoughts here. ---
[GitHub] flink issue #5951: [FLINK-9293] [runtime] SlotPool should check slot id when...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5951 Good catch! Thank you for the PR. Will try to review this asap... ---
[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5928#discussion_r185737131 --- Diff: docs/ops/state/state_backends.md --- @@ -152,7 +152,7 @@ Possible values for the config entry are *jobmanager* (MemoryStateBackend), *fil name of the class that implements the state backend factory [FsStateBackendFactory](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java), such as `org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory` for RocksDBStateBackend. -In the case where the default state backend is set to *filesystem*, the entry `state.backend.fs.checkpointdir` defines the directory where the checkpoint data will be stored. +In the case where the default state backend is set to *filesystem*, the entry `state.checkpoints.dir` defines the directory where the checkpoint data will be stored. --- End diff -- The option is used by all backends that eventually store data to file system, including RocksDBStateBackend and MemoryStateBackend (the MemoryStateBackend writes its single checkpoint metadata file there, optionally) ---
[GitHub] flink issue #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5928 In Flink 1.5, all checkpoints are externalized. There is no notion of externalized checkpoints any more, just a setting to configure the retention (retain on cancel, retain on fail, never retain). The `state.checkpoints.dir` is the recommended option for the directory where checkpoints are stored (and `state.backend.fs.checkpointdir` is a deprecated key for the same setting). ---
[GitHub] flink issue #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE if top...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5929 I am not sure if failing is the right behavior here. What is someone has a Flink job running and one of the topics for which partitions are discovered gets deleted? That would fail the job in such a way that it never recovers, even if other partitions are still valid. ---
[GitHub] flink issue #5936: [FLINK-9265] Upgrade Prometheus version
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5936 Could you add the dependency tree of the upgraded Prometheus dependency to check? ---
[GitHub] flink issue #5939: [FLINK-8500] [Kafka Connector] Get the timestamp of the K...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5939 The feature is a nice addition. Flink currently already adds the timestamp as the record's event time timestamp. You can access it via a ProcessFunction. That is a tad bit more clumsy, though... If we want to have the timestamp as part of the Deserialization Schema, I would suggest to not add yet another specialized schema, but extend the KeyedDeserializationSchema with another method that takes the timestamp. We should make that a default method that calls the existing method and make the existing method an empty default method. We could also think about renaming `KeyedDeserializationSchema` to `RichDeserializationSchema` or so, if that would describe the functionality better (I am not a native speaker, so would be nice for one to give their opinion here). ---
[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185732563 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java --- @@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExc } } + @Override + protected void registerShutdownActions(CompletableFuture terminationFuture) { + terminationFuture.whenComplete((status, throwable) -> --- End diff -- Will update ---
[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185732507 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java --- @@ -109,7 +119,11 @@ public MiniDispatcher( if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully - jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown()); + jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> { + ApplicationStatus status = result.getSerializedThrowable().isPresent() ? + ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; + jobTerminationFuture.complete(status); --- End diff -- True, I had it like that initially, but found the above version more readable in the end, because we don't really use the serializedThrowable (making the map() a bit strange). ---
[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185732539 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java --- @@ -109,7 +119,11 @@ public MiniDispatcher( if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully - jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown()); + jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> { --- End diff -- Will update ---
[GitHub] flink pull request #5948: [FLINK-9286][docs] Update classloading docs
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5948#discussion_r185731303 --- Diff: docs/monitoring/debugging_classloading.md --- @@ -69,9 +69,9 @@ cases. By default, Java ClassLoaders will first look for classes in the parent C the child ClassLoader for cases where we have a hierarchy of ClassLoaders. This is problematic if you have in your user jar a version of a library that conflicts with a version that comes with Flink. You can change this behaviour by configuring the ClassLoader resolution order via -`classloader.resolve-order: child-first` in the Flink config. However, Flink classes will still +[classloader.resolve-order](../ops/config.html#classloader-resolve-order) in the Flink config. However, Flink classes will still be resolved through the parent ClassLoader first, although you can also configure this via -`classloader.parent-first-patterns` (see [config](../ops/config.html)) +[classloader.parent-first-patterns-default](../ops/config.html#classloader-parent-first-patterns-default). --- End diff -- How about referencing `classloader.parent-first-patterns-additional here instead? The recommendation is not to change the value for "default"... ---
[GitHub] flink issue #5949: [FLINK-9288][docs] clarify the event time / watermark doc...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5949 Looks good, thanks, merging this... ---
[GitHub] flink issue #5900: [FLINK-9222][docs] add documentation for setting up Gradl...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5900 Nice addition! A few things I would like to double check on the quickstart configuration (I am not fluent enough Gradle): - We do not need to hide/shade any dependencies in the user code. In Maven, we use the shade plugin, but only to build an uber jar, not to actually relocate dependencies. Is that the same in the Gradle quickstart? - The Flink core dependencies need to be in a scope equivalent to "provided", so they do not end up in the uber jar. Can we do something similar in Gradle? This has been a frequent source of unnecessarily bloated application jars. - The Maven quickstart template uses a trick to make sure that the provided dependencies are still in the classpath when we run the program in the IDE: A profile that activates in IDEA (by a property variable) and alters the scope from *provided* to *compile*. Not sure if that is strictly necessary, but may be helpful. ---
[GitHub] flink issue #5914: [FLINK-9256][network] fix NPE in SingleInputGate#updateIn...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5914 Change looks very good, thanks! Merging this... We can probably remove most of this code out again later, once we drop the non-credit-based code paths in the next releases. But that still makes this a necessary fix for now... ---
[GitHub] flink issue #5916: [hotfix][tests] remove redundant rebalance in SuccessAfte...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5916 Merging... ---
[GitHub] flink issue #5942: [FLINK-9274][kafka] add thread name for partition discove...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5942 Good fix, thanks! Merging... ---
[GitHub] flink issue #5943: [FLINK-9275][streaming] add taskName to the output flushe...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5943 Good fix, thanks, merging... ---
[GitHub] flink issue #5938: [FLINK-9196][flip6, yarn] Cleanup application files when ...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5938 Merging this... ---
[GitHub] flink pull request #5938: [FLINK-9196][flip6, yarn] Cleanup application file...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5938#discussion_r185080370 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -570,6 +571,21 @@ public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalExc }); } + @Override + public void shutDownCluster() { + try { + sendRetryableRequest( + ShutdownHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + isConnectionProblemException()).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + log.error("Error while shutting down cluster", e); --- End diff -- Throw the cause of the `ExecutionException`? ---
[GitHub] flink issue #5944: [FLINK-8900] [yarn] Set correct application status when j...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5944 The test failure is unrelated - unrelated test flakeyness ---
[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5931 @GJL Briefly digging through the log, there are a few strange things happening: - `YarnResourceManager` still has 8 pending requests even when 11 containers are running: ```Received new container: container_1524853016208_0001_01_000184 - Remaining pending container requests: 8``` - Some slots are requested and then the requests are cancelled again - In the end, one request is not fulfilled: `aeec2a9f010a187e04e31e6efd6f0f88` Might be an inconsistency in either in the `SlotManager` or `SlotPool`. ---
[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5944 [FLINK-8900] [yarn] Set correct application status when job is finished ## What is the purpose of the change When finite Flink applications (batch jobs) are sent to YARN in the detached mode, the final status is currently always the same, because the job's result is not passed to the logic that initiates the application shutdown. This PR forwards the final job status via a future that is used to register the shutdown handlers. ## Brief change log - Introduce the `JobTerminationFuture` in the `MiniDispatcher` - ## Verifying this change ``` bin/flink run -m yarn-cluster -yjm 2048 -ytm 2048 ./examples/streaming/WordCount.jar ``` - Run the batch job as described above on YARN to succeed, check that the final application status is successful. - Run the batch job with a parameter to a non existing input file on YARN, check that the final application status is failed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink yarn_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5944.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5944 commit f4130c64420e2ad2acb680869c9b84aa5dbcc7c7 Author: Stephan Ewen Date: 2018-04-30T07:55:50Z [hotfix] [tests] Update log4j-test.properties Brings the logging definition in sync with other projects. Updates the classname for the suppressed logger in Netty to account for the new shading model introduced in Flink 1.4. commit 5fcc9aca392cbcd5dfa474b0a286868b44836f23 Author: Stephan Ewen Date: 2018-04-27T16:57:27Z [FLINK-8900] [yarn] Set correct application status when job is finished ---
[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5897 Can we actually get rid of `commons-configuration` in the table API? All the commons packages with their weird long tail of not properly declared dependencies have become a bit of an anti-pattern to me over time... ---
[GitHub] flink issue #5924: [hotfix][README.md] Update building prerequisites
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5924 Thanks, merging this... ---
[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5934 Concerning serializer snapshots: - We need to move away from Java Serializing the serializers into the config snapshots anyways and should do that in the near future. - I think the config snapshot should be created once when the state is created, encoded as `byte[]`, and then we only write the bytes. That safes us from repeated work on every checkpoint and would also prevent concurrent access to the serializer for creating the snapshot. ---
[GitHub] flink issue #5892: [FLINK-9214] YarnClient should be stopped in YARNSessionC...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5892 Thanks, looks good, merging this... ---
[GitHub] flink issue #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5928 The configuration (`config.md`)should be generated from the config options by now, so not be manually edited. (@zentol could you chime in here?) ---
[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5928#discussion_r185021293 --- Diff: docs/dev/stream/state/checkpointing.md --- @@ -137,11 +137,9 @@ Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` (see - `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging. - `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ... -- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. +- `state.checkpoints.dir`: The target directory for storing checkpoints data files and meta data of [externalized checkpoints]({{ site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints) in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. --- End diff -- Yes, `file:///` is what you use for many NAS style storage systems, so it is not local-only. Let's change this to say that the storage path must be accessible from all participating processes/nodes, i.e., all TaskManagers and JobManagers ---