[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188349853
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
--- End diff --

Empty line / indentation


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188321914
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema
+   extends RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(
+   Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(
+   Schema schema, String url, int identityMapCapacity) {
+   return new ConfluentRegistryAvroDeserializationSchema<>(
+   GenericRecord.class,
+   schema,
+   new CachedSchemaCoderProvider(url, 
identityMapCapacity));
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from avro
+* schema and looks up writer schema in Confluent Schema Registry.
+*
+* @param tClass class of record to be produced
+* @param urlurl of schema registry to connect
+* @return deserialized record
  

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188355756
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -275,9 +304,19 @@ else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
//  Utilities
// 

 
+   private static boolean isGenericRecord(Class type) {
+   return !SpecificRecord.class.isAssignableFrom(type) &&
+   GenericRecord.class.isAssignableFrom(type);
+   }
+
@Override
public TypeSerializer duplicate() {
-   return new AvroSerializer<>(type);
+   if (schemaString != null) {
+   return new AvroSerializer<>(type, new 
Schema.Parser().parse(schemaString));
--- End diff --

Duplication happens frequently, would be good to avoid schema parsing. You 
can add a private copy constructor that takes class and string.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188355390
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -99,9 +108,29 @@
 
/**
 * Creates a new AvroSerializer for the type indicated by the given 
class.
+* This constructor is intended to be used with {@link SpecificRecord} 
or reflection serializer.
+* For serializing {@link GenericData.Record} use {@link 
AvroSerializer#AvroSerializer(Class, Schema)}
 */
public AvroSerializer(Class type) {
+   Preconditions.checkArgument(!isGenericRecord(type),
--- End diff --

Minor: Other preconditions checks in this class are done by statically 
imported methods. While this is not consistent within the code base, I would 
suggest to keep this consistent within a class as much as possible.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188348636
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
--- End diff --

That would mean initializing it all lazily, in `getDatumReader()` or in a 
`checkAvroInitialized()` method.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188328437
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
--- End diff --

Double Apache header


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188337378
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
--- End diff --

This should have a serial version UID.
You can activate the respective inspections in IntelliJ to warn about such 
issues.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188336725
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
--- End diff --

All fields should be final whenever possible - immutability as the default 
choice.
That acts both as documentation about the writer's intention and makes it 
future proof.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188350196
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
+   this.schemaCoderProvider = schemaCoderProvider;
+   this.schemaCoder = schemaCoderProvider.get();
+   }
+
+   @Override
+   public T deserialize(byte[] message) {
+   // read record
+   try {
+   getInputStream().setBuffer(message);
+   Schema writerSchema = 
schemaCoder.readSchema(getInputStream());
+   Schema readerSchema = getReaderSchema();
+
+   GenericDatumReader datumReader = getDatumReader();
+
+   datumReader.setSchema(writerSchema);
+   datumReader.setExpected(readerSchema);
+
+   return datumReader.read(null, getDecoder());
+   } catch (Exception e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+   }
+
+   private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
--- End diff --

See above, would suggest to avoid readObject()


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188353074
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -79,15 +87,16 @@
 
//  runtime fields, non-serializable, lazily initialized 
---
 
-   private transient SpecificDatumWriter writer;
-   private transient SpecificDatumReader reader;
+   private transient GenericDatumWriter writer;
+   private transient GenericDatumReader reader;
 
private transient DataOutputEncoder encoder;
private transient DataInputDecoder decoder;
 
-   private transient SpecificData avroData;
+   private transient GenericData avroData;
 
private transient Schema schema;
+   private final String schemaString;
--- End diff --

As per the comments, the existing code orders config fields before runtime 
fields. Can you place the schema to match that pattern?


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188328926
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
--- End diff --

I would avoid null initialization, it is redundant. It actually does 
nothing (fields are null anyways) but acually exists as byte code, hence costs 
cpu cycles.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188349785
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
--- End diff --

We typically do empty lines between class declaration and members.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188338897
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
--- End diff --

You can make this variable final as well.


---



[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188347289
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
--- End diff --

This class uses a mixture of eager initialization of transient members (in 
readObject()) and lazy initialization (in getDatumReader()).

I would suggest to do it all one way or the other.

My suggestion would be to avoid `readObject()` whenever possible. If you 
encounter an exception during schema parsing (and it may be something weird 
from Jackson, like a missing manifest due to a shading issue), you will get the 
most unhelpful exception stack trace ever, in the weirdest place (like Flink's 
RPC message decoder).

In my experience, when a user sees such a stack trace, they are rarely able 
to diagnose that. Best case they show up on the mailing list, worst case they 
give up.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188319278
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema
+   extends RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
--- End diff --

For such situations, code in the method and parameter list should use 
different indentation, or be separated by an empty line. Otherwise makes it 
hard to parse.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188349118
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   t

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188329273
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
--- End diff --

Minor comment: Many newer classes pick up a style where the JavaDocs of 
fields are in one line, to make the fields section a bit more compact:
```
/** Class to deserialize to. */
private Class recordClazz;
```


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188319368
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema
+   extends RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(
+   Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
--- End diff --

Same as above


---


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
My gut feeling is that we don't need `WriteMode.OVERWRITE` in cases where 
one wants such an atomic file creation...


---


[GitHub] flink issue #6016: [FLINK-8933] Avoid calling Class#newInstance(part 2)

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6016
  
I am not yet convinced that this fixes an actual problem, whereas it is not 
clear whether it may cause regression (exception traces or performance).

To me this is a case to not do this change, but spend effort on other 
issues.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188316052
  
--- Diff: flink-formats/flink-avro-confluent-registry/pom.xml ---
@@ -0,0 +1,94 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+   
+   flink-formats
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-avro-confluent-registry
--- End diff --

That is a good question - maybe eventually. We could leave it in 
`flink-formats` for now until we have a case to create `flink-catalogs`.

This is also not a full-blown catalog support, as for the Table API, but 
something much simpler - just multiple Avro Schemas.


---


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
Good point about the renaming on `close()` in case close is called for 
cleanup, rather than success.

We could follow the same semantics as in 
[CheckpointStateOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java#L61)

There the semantics are:
  - `close()` means "close on error / cleanup" and closes the stream and 
deletes the temp file.
  - `closeAndPublish()` would mean "close on success" and close the stream 
and rename the file.
  - After ``closeAndPublish()` has been called, `close()` becomes a no-op.

The 
[FsCheckpointMetadataOutputStream](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java)
 implements that pattern, I think it worked well and is easy to use.




---


[GitHub] flink issue #6016: [FLINK-8933] Avoid calling Class#newInstance(part 2)

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6016
  
I am not sure we should be doing it like this.

The point of discouraging `Class.newInstance()` is not to copy replace this 
by a different call. That will not magically make anything better. The whole 
point is to go from a method that may sneakily throw undeclared exceptions, to 
a method that declares exceptions.

All places that do not require a change to the try/catch statement (add 
`InvokationTargetException`) have reasonable exception handling in place 
already, and are not vulnerable to the motivating issue.

In fact, for now this makes thing worse because Exceptions get wrapped into 
`InvokationTargetException`, with the root cause in the `getTargetException()`, 
not in the exception chain.  That would actually be a serious regression.

As a more general comment: Copy/replace style pull requests are a bit 
tricky, in my opinion - they are rather easy to create (bulk fix tool) and 
require committers to put up significant time to dig into the matter and to 
double check that it actually does not cause a regression in the individual 
spots that are changed. One can probably keep all committers busy for a year 
with just such fixes without really changing anything for users for the better, 
and having a high risk of regressions.

I would personally vote to not have many of those pull requests, or clearly 
label them as "cosmetic" and give them a very low priority in reviewing. 
Especially given that we have a decent backlog of pull requests for changes 
that where users will see a difference.


---


[GitHub] flink issue #6002: [FLINK-9350] Parameter baseInterval has wrong check messa...

2018-05-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6002
  
Good fix, thanks.

+1 to merge


---


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
Thanks, the main code looks good!

Unfortunately, this seems to wither break the compatibility with prior 
savepoints (when Avro types were implicitly handled through Kryo, now bridged 
through the `BackwarsCompatibleAvroSerializer`) or needs to adjust that test.

There are also some license header issues, causing the build to fail.


---


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
Thanks for preparing this.

I looked at the `TwoPhraseFSDatautputStream` - maybe we can make this 
simpler. Do we need the distinction between phases? Is it not enough to behave 
as a regular stream, just overriding `close()` to do `super.close()` + 
`rename()`? That may be enough. When the stream is closed, all the writing 
methods anyways fail with a "stream closed exception".

Also, we need this method to be implemented in all FileSystem subclasses.

Typos: "Phrase" --> "Phase"


---


[GitHub] flink issue #5963: [FLINK-9305][s3] also register flink-s3-fs-hadoop's facto...

2018-05-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5963
  
Looks good, thanks.

+1 to merge


---


[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5897
  
Agreed, let's add it to master as well...


---


[GitHub] flink issue #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5970
  
Looks good, thanks.

+1 to merge


---


[GitHub] flink issue #5834: [FLINK-9153] TaskManagerRunner should support rpc port ra...

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5834
  
Committers usually have a lot of different responsibilities (releases, 
testing, helping users on mailing lists, working on roadmap features, etc.). 
All that takes a lot of time. Reviewing PRs is one important part, and we try 
to do this well, but with so many users now, it is not always perfect.

One big problem is that very few committers actually take the time to look 
at external contributions.

I might help to not always ping the same people (for example @zentol , 
@tillrohrmann , me, etc.) but some other committers as well. Here is a list of 
other committers, it is not quite complete, some newer ones are not yet listed: 
http://flink.apache.org/community.html#people

Hope that helps you understand...


---


[GitHub] flink issue #6000: [FLINK-9299] [Documents] ProcessWindowFunction documentat...

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6000
  
Congrats on having PR number 6000!

This overlaps with #6001, which is a mit more comprehensive (but need some 
improvements).
Would you be up to coordinate to make one joint PR?


---


[GitHub] flink pull request #6000: [FLINK-9299] [Documents] ProcessWindowFunction doc...

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6000#discussion_r187798729
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -730,9 +730,9 @@ input
 
 /* ... */
 
-public class MyProcessWindowFunction implements 
ProcessWindowFunction, String, String, TimeWindow> {
+public static class MyProcessWindowFunction extends 
ProcessWindowFunction, String, String, TimeWindow> {
--- End diff --

In other code samples, we don't put the `static`, not assuming that this is 
defined as an inner class


---


[GitHub] flink issue #5999: [FLINK-9348] [Documentation] scalastyle documentation for...

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5999
  
This is helpful, thanks.

Merging...


---


[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5966
  
I agree, we need different key/truststores for the internal/external 
connectivity. This PR was meant as a step in that direction, separating at 
least within the SSL Utils the internal and external context setup.

In your thinking, is there ever a case for a different internal 
authentication method than "single trusted certificate"? What if were not tied 
to akka? (Side note: I think for internal communication, 'authentication is 
authorization' is probably reasonable, because the are no different users/roles 
for internal communication).

Would you assume that internally, we never do hostname verification?


---


[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5970#discussion_r187797400
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java ---
@@ -62,6 +62,10 @@
 @SuppressWarnings("serial")
 public class UdfAnalyzerTest {
 
+   private static TypeInformation> 
stringIntTuple2TypeInfo = TypeInformation.of(new TypeHint>(){});
--- End diff --

These fields are constants, so they should be final. Can you add the 
modified and rename the fields to match the naming convention?


---


[GitHub] flink issue #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5970
  
One minor style comment, otherwise this is good to go!


---


[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5897
  
From my side, +1 to merge this to `release-1.4` and `release-1.5`.


---


[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...

2018-05-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5897
  
Forwarding some comment from @fhueske from JIRA:

> I tried to reproduce this issue for 1.5 but it seems to work.
> 
> Flink 1.5 should be out soon (release candidate 2 was published two days 
ago). We can merge a fix for 1.4, but would need to wait for 1.4.3 to be 
released before it is publicly available (unless you build from the latest 1.4 
branch).
> 
> `commons-configuration` is used for the external catalog support that was 
recently reworked for the unified table source generation. The code that needs 
the dependency was deprecated. I think we can drop the code and dependency for 
the 1.6 release.

That means we should merge this into `release-1.4` and `release-1.5`. In 
`master`, we could merge this, but should probably simply drop the 
pre-unified-source code.



---


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
How about adding the method `createAtomically` or so, with otherwise the 
same signature as the `create(Path, WriteMode)` method?


---


[GitHub] flink issue #5735: [FLINK-9036] [core] Add default values to State Descripto...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5735
  
@aljoscha Would be interested in your opinion here.

This is basically one of two ways to improve the handling of default values:
  1. Add a default value supplier on the state descriptor (this approach). 
Advantage is that you can use this to backwards compatibly handle the previous 
cases of default values (including the starting value for folding state)
  2. Add a `T getOrDefault(Supplier)` method to `ValueState`. This might 
me almost simpler to do even, and more flexible as it allows for different 
default values in different contexts. This can get inefficient though when 
users naively create an anonymous class for the supplier (probably not a big 
deal any more since lambdas) and it breaks with the current approach, meaning 
we two different ways for default values that need to work together, one of 
which is deprecated, but still needs to be supported until Flink 2.0


---


[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5977#discussion_r187639708
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -837,7 +837,7 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
nextTransactionalIdHintState = 
context.getOperatorStateStore().getUnionListState(
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
transactionalIdsGenerator = new TransactionalIdsGenerator(
-   getRuntimeContext().getTaskName(),
+   getRuntimeContext().getTaskName() + "-" + 
getRuntimeContext().getOperatorUniqueID(),
--- End diff --

You could probably use only the Operator ID here - the task name does not 
add to the uniqueness. Unless the purpose of the task name is "human 
readability" in log files or metrics.


---


[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5977#discussion_r187640541
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -71,6 +71,17 @@
@PublicEvolving
MetricGroup getMetricGroup();
 
+   /**
+* Returned value is guaranteed to be unique between operators within 
the same job and to be
+* stable and the same across job submissions.
+*
+* This operation is currently only supported in Streaming 
(DataStream) contexts.
+*
+* @return String representation of the operator's unique id.
+*/
+   @PublicEvolving
+   String getOperatorUniqueID();
--- End diff --

Rather than adding this here and failing for *DataSet* programs, hoe about 
adding this to `StreamingRuntimeContext` and casting inside the Kafka Producer? 
Not super pretty, but nicer than having something that looks like a pretty 
generic concept (operator id) throwing an exception in a whole class of 
programs (batch jobs). This problem should go away anyways with the batch / 
streaming unification later.


---


[GitHub] flink issue #5973: [FLINK-9261][ssl] Fix SSL support for REST API and Web UI...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5973
  
Merged in `master` in 3afa5eb3c47158086ab29012a835f96682a85d34


---


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
I think this fix here might not work for S3, because a rename() with the S3 
file systems will actually trigger a copy (or even a download and upload), so 
it is not a cheap operation.

The we can fix this by adding a `create(...)` method (or mode) to the 
FileSystem API that does not publish data in the file until `close()` is 
called. For hdfs://, file://, this would be using a temp file with renaming, 
for S3 we don't write to a temp file, because S3 makes the file only visible on 
close() anyways.


---


[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5970#discussion_r187610790
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java ---
@@ -109,7 +112,7 @@ public String map(MyPojo value) throws Exception {
@Test
public void testForwardWithGenericTypePublicAttrAccess() {

compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class,
-   
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String");
+   TypeInformation.of(new 
TypeHint>(){}), Types.STRING);
--- End diff --

Then the code needs to be `new GenericTypeInfo<>(MyPojo.class)`. Otherwise 
it will try and determine why the Type Info for `GenericType` is and it just 
happens to be a generic type ;-)


---


[GitHub] flink issue #5962: [FLINK-9304] Timer service shutdown should not stop if in...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5962
  
The comment to make at least one attempt sounds good.

If you refactor this a little bit, you can actually test it. Either make it 
a static method to which you pass the timer service, or better, make it a 
method `shutdownUninterruptible(Deadline, Logger)` on the TimerService itself.


---


[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5970#discussion_r187591783
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java ---
@@ -109,7 +112,7 @@ public String map(MyPojo value) throws Exception {
@Test
public void testForwardWithGenericTypePublicAttrAccess() {

compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map4.class,
-   
"org.apache.flink.api.java.sca.UdfAnalyzerTest$MyPojo", "String");
+   TypeInformation.of(new 
TypeHint>(){}), Types.STRING);
--- End diff --

The `GenericTypeInfo` here seems to be not correct.


---


[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5970#discussion_r187589977
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/typeinfo/TypeInformationTest.java
 ---
@@ -61,7 +61,7 @@ public void testOfGenericClassForGenericType() {
@Test
public void testOfTypeHint() {
assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(String.class));
-   assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
TypeInformation.of(new TypeHint(){}));
+   assertEquals(BasicTypeInfo.STRING_TYPE_INFO, Types.STRING);
--- End diff --

Please undo this one, the test explicitly tests the TypeHint use.


---


[GitHub] flink pull request #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5970#discussion_r187592909
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java ---
@@ -268,8 +274,8 @@ public void 
testForwardIntoNestedTupleWithVarAndModification() {
 
@Test
public void testForwardIntoTupleWithAssignment() {
-   
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class, 
"Tuple2",
-   "Tuple2");
+   
compareAnalyzerResultWithAnnotationsSingleInput(MapFunction.class, Map14.class,
+   TypeInformation.of(new TypeHint>(){}), TypeInformation.of(new TypeHint>(){}));
--- End diff --

There is so much use of the types `Tuple2>` and 
`Tuple2`, it would make sense to factor these out into a static 
field and reference them from there.


---


[GitHub] flink issue #5987: [FLINK-9043][CLI]Automatically search for the last succes...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5987
  
Sorry, but can we take back a step and first agree on what we actually want 
the behavior to be before jumping into an implementation?


---


[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5958
  
If it turns out that we need to do a bit more design work on the 
deserialization schema, we could incrementally fix the issue that triggered 
this PR by actually extending the KeyedDeserializationSchema with a new default 
method.


---


[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

2018-05-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5958
  
There are a few bigger design aspects that we need to agree upon:

  - The `DeserializationSchema` is a shared common denominator of 
serialization schemata. That's why it is in `flink-core` and not in a Kafka 
connector project. It is used by various per-record streaming sources, like 
Kafka, RabbitMQ, in the future PubSub, or AMQ. It may be valuable to migrate 
Kinesis also to that. This PR changes the common denominator to have very 
Kafka-specific fields.

  - The common schema makes sense because we can offer a library of 
implementations, like for Avro, Json, Thrift, Protobuf. All connectors can use 
hardened implementations for these types, or integrations with schema 
registries.

  - This surfaces for example in the SQL / Table API, which is currently 
making an effort to change their source API to have "connector" and "format" 
aspects orthogonal. You define a table as "from kafka with Avro", or "from 
row-wise file with JSON", etc.

  - We should think of having something similar in the future in the 
unified batch/streaming DataStream API as well, when we rework our source 
interface. At least a "row-wise source" that can then use all these format 
implementations.

That means we are in a bit of a conflict between "common deserialization 
schema" interface and surfacing connector specific information.
One way to approach that might be making the connector-specific 
deserializer classes subclasses of the common one, and let them use specialized 
subclasses of ConsumerRecordMetaInfo that have the additional fields.

On a separate note, I think that `ConsumerRecordMetaInfo` is not the best 
name, because the type has not only the meta info, but the actual record. So we 
could call it `Record` or `Datum` or `SourceRecord`, etc.


---


[GitHub] flink issue #5973: [FLINK-9261][ssl] Fix SSL support for REST API and Web UI...

2018-05-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5973
  
Merging this...


---


[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...

2018-05-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5980#discussion_r187191338
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
 ---
@@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
 * the logical slot.
 *
 * @param cause of the payload release
-* @return true if the logical slot's payload could be released, 
otherwise false
 */
@Override
-   public boolean release(Throwable cause) {
-   return releaseSlot(cause).isDone();
+   public void release(Throwable cause) {
+   if (STATE_UPDATER.compareAndSet(this, State.ALIVE, 
State.RELEASING)) {
+   signalPayloadRelease(cause);
+   }
+   state = State.RELEASED;
+   releaseFuture.complete(null);
+   }
+
+   private CompletableFuture signalPayloadRelease(Throwable cause) {
+   tryAssignPayload(TERMINATED_PAYLOAD);
+   payload.fail(cause);
+
+   return payload.getTerminalStateFuture();
+   }
+
+   private void returnSlotToOwner(CompletableFuture 
terminalStateFuture) {
+   final CompletableFuture slotReturnFuture = 
terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
+   if (state == State.RELEASING) {
--- End diff --

What happens if this gets set concurrently to `RELEASED`? This would only 
work if `slotOwner.returnAllocatedSlot(this)` works in both cases (releasing 
and released) and the second path (returning the completed future) is the 
optimization/fast path if it is already released. (double checking the 
assumption).


---


[GitHub] flink pull request #5980: [FLINK-9324] Wait for slot release before completi...

2018-05-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5980#discussion_r187189887
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
 ---
@@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
 * the logical slot.
 *
 * @param cause of the payload release
-* @return true if the logical slot's payload could be released, 
otherwise false
 */
@Override
-   public boolean release(Throwable cause) {
-   return releaseSlot(cause).isDone();
+   public void release(Throwable cause) {
+   if (STATE_UPDATER.compareAndSet(this, State.ALIVE, 
State.RELEASING)) {
+   signalPayloadRelease(cause);
+   }
+   state = State.RELEASED;
+   releaseFuture.complete(null);
+   }
+
+   private CompletableFuture signalPayloadRelease(Throwable cause) {
+   tryAssignPayload(TERMINATED_PAYLOAD);
+   payload.fail(cause);
+
+   return payload.getTerminalStateFuture();
+   }
+
+   private void returnSlotToOwner(CompletableFuture 
terminalStateFuture) {
+   final CompletableFuture slotReturnFuture = 
terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
+   if (state == State.RELEASING) {
+   return slotOwner.returnAllocatedSlot(this);
+   } else {
+   return CompletableFuture.completedFuture(true);
+   }
+   }).thenCompose(Function.identity());
+
+   slotReturnFuture.whenComplete(
+   (Object ignored, Throwable throwable) -> {
+   state = State.RELEASED;
--- End diff --

Mutating a private member field from within a lambda - would it make sense 
to have a (package) private method `markReleased()` or so?


---


[GitHub] flink issue #5978: [FLINK-8554] Upgrade AWS SDK

2018-05-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5978
  
Before merging this, we need to
  - diff the dependency trees between the versions
  - clear all licenses for the dependency changes. put changes into the 
NOTICE files
  - check that no unshaded classes are added to the jar



---


[GitHub] flink issue #5975: [FLINK-9138][docs][tests] Make ConfigOptionsDocsCompleten...

2018-05-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5975
  
+1


---


[GitHub] flink pull request #5973: [FLINK-9261][ssl] Fix SSL support for REST API and...

2018-05-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5973#discussion_r187025268
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java ---
@@ -81,16 +85,62 @@ public static void 
setSSLVerAndCipherSuites(ServerSocket socket, Configuration c
}
}
 
+   /**
+* Creates a {@link SSLEngineFactory} to be used by the Server.
+*
+* @param config The application configuration.
+*/
+   public static SSLEngineFactory createServerSSLEngineFactory(final 
Configuration config) throws Exception {
+   return createSSLEngineFactory(config, false);
+   }
+
+   /**
+* Creates a {@link SSLEngineFactory} to be used by the Client.
+* @param config The application configuration.
+*/
+   public static SSLEngineFactory createClientSSLEngineFactory(final 
Configuration config) throws Exception {
+   return createSSLEngineFactory(config, true);
+   }
+
+   private static SSLEngineFactory createSSLEngineFactory(
+   final Configuration config,
+   final boolean clientMode) throws Exception {
+
+   final SSLContext sslContext = clientMode ?
+   createSSLClientContext(config) :
+   createSSLServerContext(config);
+
+   checkState(sslContext != null, "%s it not enabled", 
SecurityOptions.SSL_ENABLED.key());
+
+   return new SSLEngineFactory(
+   sslContext,
+   getEnabledProtocols(config),
+   getEnabledCipherSuites(config),
+   clientMode);
+   }
+
/**
 * Sets SSL version and cipher suites for SSLEngine.
-* @param engine
-*SSLEngine to be handled
-* @param config
-*The application configuration
+*
+* @param engine SSLEngine to be handled
+* @param config The application configuration
+* @deprecated Use {@link #createClientSSLEngineFactory(Configuration)} 
or
+* {@link #createServerSSLEngineFactory(Configuration)}.
 */
+   @Deprecated
public static void setSSLVerAndCipherSuites(SSLEngine engine, 
Configuration config) {
-   
engine.setEnabledProtocols(config.getString(SecurityOptions.SSL_PROTOCOL).split(","));
-   
engine.setEnabledCipherSuites(config.getString(SecurityOptions.SSL_ALGORITHMS).split(","));
+   engine.setEnabledProtocols(getEnabledProtocols(config));
+   engine.setEnabledCipherSuites(getEnabledCipherSuites(config));
+   }
+
+   private static String[] getEnabledProtocols(final Configuration config) 
{
+   requireNonNull(config, "config must not be null");
--- End diff --

For private internal utilities, I suggest to skip the null check in most 
places, especially when it will eagerly fail with an exception on null anyways.

In any case, if you believe the check should be there, please use 
`Preconditions.checkNotNull` rather than `requireNonNull`.


---


[GitHub] flink issue #5972: [FLINK-9323][build] Properly organize checkstyle-plugin c...

2018-05-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5972
  
+1 to the cleanup, but I think this might be breaking relative paths of the 
suppression files, hence causing the build to fail. Probably need to make the 
paths relative to "root dir" or some "highest project dir" variable.


---


[GitHub] flink issue #5936: [FLINK-9265] Upgrade Prometheus version

2018-05-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5936
  
Looks good, thanks.

+1 to merge this


---


[GitHub] flink pull request #5966: [FLINK-9312] [security] Add mutual authentication ...

2018-05-08 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5966#discussion_r186800971
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
 ---
@@ -170,8 +171,8 @@ public void initChannel(SocketChannel channel) throws 
Exception {
 
localAddress = (InetSocketAddress) 
bindFuture.channel().localAddress();
 
-   long end = System.currentTimeMillis();
-   LOG.info("Successful initialization (took {} ms). Listening on 
SocketAddress {}.", (end - start), 
bindFuture.channel().localAddress().toString());
+   final long duration = (start - System.nanoTime()) / 1_000_000;
+   LOG.info("Successful initialization (took {} ms). Listening on 
SocketAddress {}.", duration, localAddress);
--- End diff --

You are absolutely right, thanks for catching this!


---


[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5966
  
@EronWright This might be interesting to you as well


---


[GitHub] flink pull request #5966: [FLINK-9312] [security] Add mutual authentication ...

2018-05-07 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5966

[FLINK-9312] [security] Add mutual authentication for RPC and data plane

## What is the purpose of the change

Currently, the Flink processes encrypted connections via SSL:
  - Data exchange TM - TM
  - RPC JM - TM
  - Blob Service JM - TM

  - (Optionally to ZooKeeper and connectors, this is connector specific and 
not in scope of this change)

However, the server side always accepts any client to build up the 
connection, meaning the connections are not strongly authenticated. Activating 
SSL mutual authentication strengthens this significantly - only processes that 
have access to the same certificate can connect.

## Brief change log

  - Activate mutual auth in akka (via akka config)
  - Activate mutual auth in Netty for data shuffles via `SSLContext` and 
`SSLEngine` parameters

## Verifying this change

  - Adds a test to the `NettyClientServerSslTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink mutual_auth

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5966


commit 8bceb03d5653c94247b72d6256f4e9e37b036e35
Author: Stephan Ewen 
Date:   2018-05-07T17:44:33Z

[FLINK-9313] [security] Activate mutual authentication for RPC/akka

commit 59b017580d30904418e0867ac122a8183dc5db70
Author: Stephan Ewen 
Date:   2018-05-07T19:28:41Z

[FLINK-9314] [security] Add mutual authentication for Netty / TaskManager's 
data plane




---


[GitHub] flink issue #5965: [FLINK-9310] [security] Update standard cipher suites for...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5965
  
@EronWright This might be interesting to you.


---


[GitHub] flink pull request #5965: [FLINK-9310] [security] Update standard cipher sui...

2018-05-07 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5965

[FLINK-9310] [security] Update standard cipher suites for secure mode

## What is the purpose of the change

This sets the cipher suits accepted by default to those recommended in
IETF RFC 7525 : https://tools.ietf.org/html/rfc7525

## Brief change log

Updates the default value of the respective config option to
```

TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
```

## Verifying this change

This change is already covered by the existing tests that test SSL setups.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
update_cipher_suits

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5965.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5965


commit 9b24574cd437ddbc2d3546c1fa0f73e983c02e31
Author: Stephan Ewen 
Date:   2018-05-07T17:47:00Z

[FLINK-9310] [security] Update standard cipher suites for secure mode

This sets the cipher suits accepted by default to those recommended in
IETF RFC 7525 : https://tools.ietf.org/html/rfc7525




---


[GitHub] flink issue #5936: [FLINK-9265] Upgrade Prometheus version

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5936
  
Yes, before giving +1 to this commit, we need to check that this introduces 
no new transitive dependency, or need to make sure that dependency is not an 
issue.


---


[GitHub] flink issue #5427: [FLINK-8533] [checkpointing] Support MasterTriggerRestore...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5427
  
Thanks a lot, looks good, can merge this now.

One quick question: You decided to have empty default implementations for 
the new methods in the master hook interface. Given that Pravega is currently 
the only known user of that interface, I would be okay with breaking the 
interface (no default methods) if you think that would be cleaner.


---


[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5954#discussion_r186376326
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
 ---
@@ -296,7 +296,7 @@ public TaskManagerLocation addTaskManager(int 
numberSlots) {
@Override
public void releaseTaskManager(ResourceID resourceId) {
try {
-   slotPool.releaseTaskManager(resourceId).get();
+   slotPool.releaseTaskManager(resourceId, 
null).get();
--- End diff --

I would avoid null exceptions.


---


[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5954#discussion_r186376122
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -1070,13 +1071,22 @@ protected void 
timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
removePendingRequest(slotRequestId);
}
 
-   private void releaseTaskManagerInternal(final ResourceID resourceId) {
-   final FlinkException cause = new FlinkException("Releasing 
TaskManager " + resourceId + '.');
+   private void releaseTaskManagerInternal(final ResourceID resourceId, 
final Exception cause) {
--- End diff --

I think we can just use the `cause` exception and do not need to re-wrap it 
into another exception.

If re-wrapping is needed for some reason, we need to add the original 
exception as the cause.


---


[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5954#discussion_r186376182
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ---
@@ -86,9 +86,10 @@
 * Releases a TaskExecutor with the given {@link ResourceID} from the 
{@link SlotPool}.
 *
 * @param resourceId identifying the TaskExecutor which shall be 
released from the SlotPool
+* @param cause for the release the TaskManager
 * @return Future acknowledge which is completed after the TaskExecutor 
has been released
 */
-   CompletableFuture releaseTaskManager(final ResourceID 
resourceId);
+   CompletableFuture releaseTaskManager(final ResourceID 
resourceId, final Exception cause);
--- End diff --

Throwable, see above.


---


[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5954#discussion_r186375693
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -212,7 +212,7 @@ public void start(JobMasterId jobMasterId, String 
newJobManagerAddress) throws E
 
// release all registered slots by releasing the corresponding 
TaskExecutors
for (ResourceID taskManagerResourceId : registeredTaskManagers) 
{
-   releaseTaskManagerInternal(taskManagerResourceId);
+   releaseTaskManagerInternal(taskManagerResourceId, null);
--- End diff --

I would try to avoid null here. It is still helpful to have a message that 
says "SlotPool is shutting down".


---


[GitHub] flink pull request #5954: [FLINK-9276] Improve error message when TaskManage...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5954#discussion_r186375841
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -1050,11 +1050,12 @@ else if (availableSlots.tryRemove(allocationID)) {
 * when we find some TaskManager becomes "dead" or "abnormal", and we 
decide to not using slots from it anymore.
 *
 * @param resourceId The id of the TaskManager
+* @param cause for the release the TaskManager
 */
@Override
-   public CompletableFuture releaseTaskManager(final 
ResourceID resourceId) {
+   public CompletableFuture releaseTaskManager(final 
ResourceID resourceId, final Exception cause) {
--- End diff --

I would use `Throwable` in the signatures. It may always be that some Error 
is the cause (class not found, etc.)


---


[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...

2018-05-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5931
  
@sihuazhou and @shuai-xu thank you for your help in understanding the bug 
here.

Let me rephrase it to make sure I understand the problem exactly. The steps 
are the following:

  1. JobMaster / SlotPool requests a slot (AllocationID) from the 
ResourceManager
  2. ResourceManager starts a container with a TaskManager
  3. TaskManager registers at ResourceManager, which tells the TaskManager 
to push a slot to the JobManager.
  4. TaskManager container is killed
  5. The ResourceManager does not queue back the slot requests 
(AllocationIDs) that it sent to the previous TaskManager, so the requests are 
lost and need to time out before another attempt is tried.

Some thoughts on how to deal with this:
  - It seems the ResourceManager should put the slots from the TaskManager 
that was failed back to "pending" so they are given to the next TaskManager 
that starts.
  - I assume that is not happening, because there is concern that the 
failure is also detected on the JobManager/SlotPool and retried there and there 
are double re-tries
  - The solution would be to better define the protocol with respect to who 
is responsible for what retries.

Two ideas on how to fix that:
  1. The ResourceManager notifies the SlotPool that a certain set of 
AllocationIDs has failed, and the SlotPool directly retries the allocations, 
resulting in directly starting new containers.
  2. The ResourceManager always retries allocations for AllocationIDs it 
knows. The SlotPool would not retry, it would keep the same allocations always 
unless they are released as unneeded. We would probably need something to make 
sure that the SlotPool can distinguish from different offers of the same 
AllocationID (in case the ResourceManager assumes a timeout but a request goes 
actually through) - possibly something like an attempt-counter (higher wins).

@tillrohrmann also interested in your thoughts here.



---


[GitHub] flink issue #5951: [FLINK-9293] [runtime] SlotPool should check slot id when...

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5951
  
Good catch! Thank you for the PR.

Will try to review this asap...


---


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r185737131
  
--- Diff: docs/ops/state/state_backends.md ---
@@ -152,7 +152,7 @@ Possible values for the config entry are *jobmanager* 
(MemoryStateBackend), *fil
 name of the class that implements the state backend factory 
[FsStateBackendFactory](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java),
 such as 
`org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory` for 
RocksDBStateBackend.
 
-In the case where the default state backend is set to *filesystem*, the 
entry `state.backend.fs.checkpointdir` defines the directory where the 
checkpoint data will be stored.
+In the case where the default state backend is set to *filesystem*, the 
entry `state.checkpoints.dir` defines the directory where the checkpoint data 
will be stored.
--- End diff --

The option is used by all backends that eventually store data to file 
system, including RocksDBStateBackend and MemoryStateBackend (the 
MemoryStateBackend writes its single checkpoint metadata file there, optionally)


---


[GitHub] flink issue #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5928
  
In Flink 1.5, all checkpoints are externalized. There is no notion of 
externalized checkpoints any more, just a setting to configure the retention 
(retain on cancel, retain on fail, never retain).

The `state.checkpoints.dir` is the recommended option for the directory 
where checkpoints are stored (and `state.backend.fs.checkpointdir` is a 
deprecated key for the same setting).


---


[GitHub] flink issue #5929: [FLINK-8497] [connectors] KafkaConsumer throws NPE if top...

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5929
  
I am not sure if failing is the right behavior here.

What is someone has a Flink job running and one of the topics for which 
partitions are discovered gets deleted? That would fail the job in such a way 
that it never recovers, even if other partitions are still valid.


---


[GitHub] flink issue #5936: [FLINK-9265] Upgrade Prometheus version

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5936
  
Could you add the dependency tree of the upgraded Prometheus dependency to 
check?


---


[GitHub] flink issue #5939: [FLINK-8500] [Kafka Connector] Get the timestamp of the K...

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5939
  
The feature is a nice addition.

Flink currently already adds the timestamp as the record's event time 
timestamp. You can access it via a ProcessFunction. That is a tad bit more 
clumsy, though...

If we want to have the timestamp as part of the Deserialization Schema, I 
would suggest to not add yet another specialized schema, but extend the 
KeyedDeserializationSchema with another method that takes the timestamp. We 
should make that a default method that calls the existing method and make the 
existing method an empty default method.

We could also think about renaming `KeyedDeserializationSchema` to 
`RichDeserializationSchema` or so, if that would describe the functionality 
better (I am not a native speaker, so would be nice for one to give their 
opinion here).


---


[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5944#discussion_r185732563
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
 ---
@@ -131,6 +133,17 @@ protected JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkExc
}
}
 
+   @Override
+   protected void 
registerShutdownActions(CompletableFuture terminationFuture) 
{
+   terminationFuture.whenComplete((status, throwable) ->
--- End diff --

Will update


---


[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5944#discussion_r185732507
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 ---
@@ -109,7 +119,11 @@ public MiniDispatcher(
 
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the 
first JobResult successfully
-   jobResultFuture.whenComplete((JobResult ignored, 
Throwable throwable) -> shutDown());
+   jobResultFuture.whenComplete((JobResult result, 
Throwable throwable) -> {
+   ApplicationStatus status = 
result.getSerializedThrowable().isPresent() ?
+   ApplicationStatus.FAILED : 
ApplicationStatus.SUCCEEDED;
+   jobTerminationFuture.complete(status);
--- End diff --

True, I had it like that initially, but found the above version more 
readable in the end, because we don't really use the serializedThrowable 
(making the map() a bit strange).


---


[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5944#discussion_r185732539
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 ---
@@ -109,7 +119,11 @@ public MiniDispatcher(
 
if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
// terminate the MiniDispatcher once we served the 
first JobResult successfully
-   jobResultFuture.whenComplete((JobResult ignored, 
Throwable throwable) -> shutDown());
+   jobResultFuture.whenComplete((JobResult result, 
Throwable throwable) -> {
--- End diff --

Will update


---


[GitHub] flink pull request #5948: [FLINK-9286][docs] Update classloading docs

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5948#discussion_r185731303
  
--- Diff: docs/monitoring/debugging_classloading.md ---
@@ -69,9 +69,9 @@ cases. By default, Java ClassLoaders will first look for 
classes in the parent C
 the child ClassLoader for cases where we have a hierarchy of ClassLoaders. 
This is problematic if you
 have in your user jar a version of a library that conflicts with a version 
that comes with Flink. You can
 change this behaviour by configuring the ClassLoader resolution order via
-`classloader.resolve-order: child-first` in the Flink config. However, 
Flink classes will still
+[classloader.resolve-order](../ops/config.html#classloader-resolve-order) 
in the Flink config. However, Flink classes will still
 be resolved through the parent ClassLoader first, although you can also 
configure this via
-`classloader.parent-first-patterns` (see [config](../ops/config.html))

+[classloader.parent-first-patterns-default](../ops/config.html#classloader-parent-first-patterns-default).
--- End diff --

How about referencing `classloader.parent-first-patterns-additional here 
instead? The recommendation is not to change the value for "default"...


---


[GitHub] flink issue #5949: [FLINK-9288][docs] clarify the event time / watermark doc...

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5949
  
Looks good, thanks, merging this...


---


[GitHub] flink issue #5900: [FLINK-9222][docs] add documentation for setting up Gradl...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5900
  
Nice addition!

A few things I would like to double check on the quickstart configuration 
(I am not fluent enough Gradle):

  - We do not need to hide/shade any dependencies in the user code. In 
Maven, we use the shade plugin, but only to build an uber jar, not to actually 
relocate dependencies. Is that the same in the Gradle quickstart?

  - The Flink core dependencies need to be in a scope equivalent to 
"provided", so they do not end up in the uber jar. Can we do something similar 
in Gradle? This has been a frequent source of unnecessarily bloated application 
jars.

  - The Maven quickstart template uses a trick to make sure that the 
provided dependencies are still in the classpath when we run the program in the 
IDE: A profile that activates in IDEA (by a property variable) and alters the 
scope from *provided* to *compile*. Not sure if that is strictly necessary, but 
may be helpful.


---


[GitHub] flink issue #5914: [FLINK-9256][network] fix NPE in SingleInputGate#updateIn...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5914
  
Change looks very good, thanks!
Merging this...

We can probably remove most of this code out again later, once we drop the 
non-credit-based code paths in the next releases. But that still makes this a 
necessary fix for now...


---


[GitHub] flink issue #5916: [hotfix][tests] remove redundant rebalance in SuccessAfte...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5916
  
Merging...


---


[GitHub] flink issue #5942: [FLINK-9274][kafka] add thread name for partition discove...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5942
  
Good fix, thanks!

Merging...


---


[GitHub] flink issue #5943: [FLINK-9275][streaming] add taskName to the output flushe...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5943
  
Good fix, thanks, merging...


---


[GitHub] flink issue #5938: [FLINK-9196][flip6, yarn] Cleanup application files when ...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5938
  
Merging this...


---


[GitHub] flink pull request #5938: [FLINK-9196][flip6, yarn] Cleanup application file...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5938#discussion_r185080370
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -570,6 +571,21 @@ public LeaderConnectionInfo getClusterConnectionInfo() 
throws LeaderRetrievalExc
});
}
 
+   @Override
+   public void shutDownCluster() {
+   try {
+   sendRetryableRequest(
+   ShutdownHeaders.getInstance(),
+   EmptyMessageParameters.getInstance(),
+   EmptyRequestBody.getInstance(),
+   isConnectionProblemException()).get();
+   } catch (InterruptedException e) {
+   Thread.currentThread().interrupt();
+   } catch (ExecutionException e) {
+   log.error("Error while shutting down cluster", e);
--- End diff --

Throw the cause of the `ExecutionException`?


---


[GitHub] flink issue #5944: [FLINK-8900] [yarn] Set correct application status when j...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5944
  
The test failure is unrelated - unrelated test flakeyness 


---


[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5931
  
 @GJL Briefly digging through the log, there are a few strange things 
happening:

  -  `YarnResourceManager` still has 8 pending requests even when 11 
containers are running:
```Received new container: container_1524853016208_0001_01_000184 - 
Remaining pending container requests: 8```

  - Some slots are requested and then the requests are cancelled again
  - In the end, one request is not fulfilled: 
`aeec2a9f010a187e04e31e6efd6f0f88`

Might be an inconsistency in either in the `SlotManager` or `SlotPool`.


---


[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

2018-04-30 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5944

[FLINK-8900] [yarn] Set correct application status when job is finished

## What is the purpose of the change

When finite Flink applications (batch jobs) are sent to YARN in the 
detached mode, the final status is currently always the same, because the job's 
result is not passed to the logic that initiates the application shutdown.

This PR forwards the final job status via a future that is used to register 
the shutdown handlers.

## Brief change log

  - Introduce the `JobTerminationFuture` in the `MiniDispatcher`
  - 

## Verifying this change

```
bin/flink run -m yarn-cluster -yjm 2048 -ytm 2048  
./examples/streaming/WordCount.jar
```

  - Run the batch job as described above on YARN to succeed, check that the 
final application status is successful.

  - Run the batch job with a parameter to a non existing input file on 
YARN, check that the final application status is failed.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink yarn_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5944.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5944


commit f4130c64420e2ad2acb680869c9b84aa5dbcc7c7
Author: Stephan Ewen 
Date:   2018-04-30T07:55:50Z

[hotfix] [tests] Update log4j-test.properties

Brings the logging definition in sync with other projects.
Updates the classname for the suppressed logger in Netty to account for the 
new
shading model introduced in Flink 1.4.

commit 5fcc9aca392cbcd5dfa474b0a286868b44836f23
Author: Stephan Ewen 
Date:   2018-04-27T16:57:27Z

[FLINK-8900] [yarn] Set correct application status when job is finished




---


[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5897
  
Can we actually get rid of `commons-configuration` in the table API?

All the commons packages with their weird long tail of not properly 
declared dependencies have become a bit of an anti-pattern to me over time...


---


[GitHub] flink issue #5924: [hotfix][README.md] Update building prerequisites

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5924
  
Thanks, merging this...


---


[GitHub] flink issue #5934: [FLINK-9269][state] fix concurrency problem when performi...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5934
  
Concerning serializer snapshots:
  - We need to move away from Java Serializing the serializers into the 
config snapshots anyways and should do that in the near future.
  - I think the config snapshot should be created once when the state is 
created, encoded as `byte[]`, and then we only write the bytes. That safes us 
from repeated work on every checkpoint and would also prevent concurrent access 
to the serializer for creating the snapshot.


---


[GitHub] flink issue #5892: [FLINK-9214] YarnClient should be stopped in YARNSessionC...

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5892
  
Thanks, looks good, merging this...


---


[GitHub] flink issue #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5928
  
The configuration (`config.md`)should be generated from the config options 
by now, so not be manually edited.

(@zentol could you chime in here?)


---


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-04-30 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r185021293
  
--- Diff: docs/dev/stream/state/checkpointing.md ---
@@ -137,11 +137,9 @@ Some more parameters and/or defaults may be set via 
`conf/flink-conf.yaml` (see
-  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's 
memory. Should be used only for minimal state (Kafka offsets) or testing and 
local debugging.
-  `filesystem`: State is in-memory on the TaskManagers, and state 
snapshots are stored in a file system. Supported are all filesystems supported 
by Flink, for example HDFS, S3, ...
 
-- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a 
Flink supported filesystem. Note: State backend must be accessible from the 
JobManager, use `file://` only for local setups.
+- `state.checkpoints.dir`: The target directory for storing checkpoints 
data files and meta data of [externalized checkpoints]({{ site.baseurl 
}}/ops/state/checkpoints.html#externalized-checkpoints) in a Flink supported 
filesystem. Note: State backend must be accessible from the JobManager, use 
`file://` only for local setups.
--- End diff --

Yes, `file:///` is what you use for many NAS style storage systems, so it 
is not local-only. Let's change this to say that the storage path must be 
accessible from all participating processes/nodes, i.e., all TaskManagers and 
JobManagers


---


<    1   2   3   4   5   6   7   8   9   10   >