[flink] branch master updated: [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2826ff8 [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407) 2826ff8 is described below commit 2826ff80c4b056d7af589649238b3acabca43837 Author: Jing Zhang AuthorDate: Tue May 14 09:21:39 2019 +0800 [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407) --- .../PlannerResolvedFieldReference.scala| 26 ++ .../logical/FlinkLogicalTableSourceScan.scala | 15 +- .../physical/batch/BatchExecTableSourceScan.scala | 62 +++- .../stream/StreamExecTableSourceScan.scala | 148 +- .../apache/flink/table/plan/util/ScanUtil.scala| 2 +- .../flink/table/sources/TableSourceUtil.scala | 321 - .../table/sources/definedTimeAttributes.scala | 95 ++ .../table/sources/tsextractors/ExistingField.scala | 108 +++ .../sources/wmstrategies/AscendingTimestamps.scala | 49 .../sources/wmstrategies/watermarkStrategies.scala | 81 ++ .../table/runtime/batch/sql/TableScanITCase.scala | 119 .../table/runtime/stream/sql/TableScanITCase.scala | 144 + .../apache/flink/table/util/testTableSources.scala | 106 +++ 13 files changed, 1254 insertions(+), 22 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala new file mode 100644 index 000..a3406f8 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/expressions/PlannerResolvedFieldReference.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import org.apache.flink.api.common.typeinfo.TypeInformation + +case class PlannerResolvedFieldReference( +name: String, +resultType: TypeInformation[_], +fieldIndex: Int) extends ResolvedFieldReference diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala index cb94b3a..7b2b7a3 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.plan.nodes.logical -import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan.isTableSourceScan @@ -49,7 +48,9 @@ class FlinkLogicalTableSourceScan( extends TableScan(cluster, traitSet, relOptTable) with FlinkLogicalRel { - val tableSource: TableSource[_] = relOptTable.unwrap(classOf[TableSourceTable[_]]).tableSource + lazy val tableSource: TableSource[_] = tableSourceTable.tableSource + + private lazy val tableSourceTable = relOptTable.unwrap(classOf[TableSourceTable[_]]) def copy( traitSet: RelTraitSet, @@ -63,15 +64,7 @@ class FlinkLogicalTableSourceScan( override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - -tableSource match { - case s: StreamTableSource[_] => -TableSourceUtil.getRelDataType(s, None, streaming = true, flinkTypeFactory) - case _: BatchTableSource[_] => -flinkTypeFactory.buildLogicalRowType( - tableSource.getTableSchema, isStreaming = Op
[flink] branch master updated: [FLINK-12233][hive] Support table related operations in HiveCatalog
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f919b23 [FLINK-12233][hive] Support table related operations in HiveCatalog f919b23 is described below commit f919b23890913aa8d3f6241a1d70ef82006863fc Author: bowen.li AuthorDate: Tue May 7 11:52:17 2019 -0700 [FLINK-12233][hive] Support table related operations in HiveCatalog This PR introduced HiveCatalogTable and implemented table related catalog APIs in HiveCatalog. This closes #8353. --- .../catalog/hive/GenericHiveMetastoreCatalog.java | 238 ++--- .../flink/table/catalog/hive/HiveCatalog.java | 110 +++--- .../flink/table/catalog/hive/HiveCatalogBase.java | 190 .../flink/table/catalog/hive/HiveCatalogTable.java | 105 + .../flink/table/catalog/hive/HiveCatalogUtil.java | 1 - .../flink/table/catalog/hive/HiveTableConfig.java | 4 +- .../hive/util/GenericHiveMetastoreCatalogUtil.java | 171 --- .../HiveTableUtil.java}| 31 ++- .../hive/GenericHiveMetastoreCatalogTest.java | 3 +- .../flink/table/catalog/hive/HiveCatalogTest.java | 110 -- .../flink/table/catalog/hive/HiveTestUtils.java| 7 + .../table/catalog/GenericInMemoryCatalogTest.java | 4 +- .../flink/table/catalog/CatalogTestBase.java | 33 ++- .../flink/table/catalog/CatalogTestUtil.java | 13 +- 14 files changed, 594 insertions(+), 426 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java index 843b4a2..c6d14f4 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java @@ -18,12 +18,17 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.GenericCatalogDatabase; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.GenericCatalogView; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; @@ -33,25 +38,28 @@ import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; -import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.hive.util.GenericHiveMetastoreCatalogUtil; +import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage. @@ -59,6 +67,10 @@ import java.util.List; public class GenericHiveMetastoreCatalog extends HiveCatalog
[flink] branch master updated: [hotfix][table-planner] Port TimeIndicatorTypeInfo to table-common
This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new e8df4fb [hotfix][table-planner] Port TimeIndicatorTypeInfo to table-common e8df4fb is described below commit e8df4fb1e50ffc353e807eaf1aa3ee106b16559a Author: Dawid Wysakowicz AuthorDate: Tue May 7 18:09:37 2019 +0200 [hotfix][table-planner] Port TimeIndicatorTypeInfo to table-common This closes #8363. --- .../table/typeutils/TimeIndicatorTypeInfo.java | 77 ++ .../table/typeutils/TimeIndicatorTypeInfo.scala| 59 - 2 files changed, 77 insertions(+), 59 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java new file mode 100644 index 000..e46bf3f --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.SqlTimestampComparator; +import org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer; + +import java.sql.Timestamp; + +/** + * Type information for indicating event or processing time. However, it behaves like a + * regular SQL timestamp but is serialized as Long. + */ +@Internal +public class TimeIndicatorTypeInfo extends SqlTimeTypeInfo { + + private final boolean isEventTime; + + public static final int ROWTIME_STREAM_MARKER = -1; + public static final int PROCTIME_STREAM_MARKER = -2; + + public static final int ROWTIME_BATCH_MARKER = -3; + public static final int PROCTIME_BATCH_MARKER = -4; + + public static final TimeIndicatorTypeInfo ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true); + public static final TimeIndicatorTypeInfo PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false); + + @SuppressWarnings("unchecked") + protected TimeIndicatorTypeInfo(boolean isEventTime) { + super(Timestamp.class, SqlTimestampSerializer.INSTANCE, (Class) SqlTimestampComparator.class); + this.isEventTime = isEventTime; + } + + // this replaces the effective serializer by a LongSerializer + // it is a hacky but efficient solution to keep the object creation overhead low but still + // be compatible with the corresponding SqlTimestampTypeInfo + @Override + @SuppressWarnings("unchecked") + public TypeSerializer createSerializer(ExecutionConfig executionConfig) { + return (TypeSerializer) LongSerializer.INSTANCE; + } + + public boolean isEventTime() { + return isEventTime; + } + + @Override + public String toString() { + if (isEventTime) { + return "TimeIndicatorTypeInfo(rowtime)"; + } else { + return "TimeIndicatorTypeInfo(proctime)"; + } + } +} + diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala deleted file mode 100644 index ad82d52..000 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding cop
[flink] branch master updated: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 0e953d1 [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode 0e953d1 is described below commit 0e953d10c2cdc61dc978a13c0c94320034bf0179 Author: Jeff Zhang AuthorDate: Wed Apr 10 22:53:25 2019 +0800 [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode Remove setting of yarn.minicluster.fixed.ports copy yarn-site.xml to target/test-classes This commit closes #8144. --- .../java/org/apache/flink/yarn/YarnTestBase.java | 21 +++- .../flink/yarn/AbstractYarnClusterDescriptor.java | 38 +- .../src/main/java/org/apache/flink/yarn/Utils.java | 12 --- 3 files changed, 35 insertions(+), 36 deletions(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index bd46dc4..9aad148 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -154,6 +154,8 @@ public abstract class YarnTestBase extends TestLogger { protected static File tempConfPathForSecureRun = null; protected static File flinkShadedHadoopDir; + protected static File yarnSiteXML = null; + private YarnClient yarnClient = null; private static org.apache.flink.configuration.Configuration globalConfiguration; @@ -166,7 +168,6 @@ public abstract class YarnTestBase extends TestLogger { YARN_CONFIGURATION = new YarnConfiguration(); YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32); YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways - YARN_CONFIGURATION.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); YARN_CONFIGURATION.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); YARN_CONFIGURATION.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); @@ -340,15 +341,14 @@ public abstract class YarnTestBase extends TestLogger { } } - public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException { - tmp.create(); - File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml"); - + // write yarn-site.xml to target/test-classes so that flink pick can pick up this when + // initializing YarnClient properly from classpath + public static void writeYarnSiteConfigXML(Configuration yarnConf, File targetFolder) throws IOException { + yarnSiteXML = new File(targetFolder, "/yarn-site.xml"); try (FileWriter writer = new FileWriter(yarnSiteXML)) { yarnConf.writeXml(writer); writer.flush(); } - return yarnSiteXML; } /** @@ -584,9 +584,10 @@ public abstract class YarnTestBase extends TestLogger { map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir); - File yarnConfFile = writeYarnSiteConfigXML(conf); - map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath()); + File targetTestClassesFolder = new File("target/test-classes"); + writeYarnSiteConfigXML(conf, targetTestClassesFolder); map.put("IN_TESTS", "yes we are in tests"); // see YarnClusterDescriptor() for more infos + map.put("YARN_CONF_DIR", targetTestClassesFolder.getAbsolutePath()); TestBaseUtils.setEnv(map); Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED); @@ -890,6 +891,10 @@ public abstract class YarnTestBase extends TestLogger { tempConfPathForSecureRun = null; } + if (yarnSiteXML != null) { + yarnSiteXML.delete(); + } + // When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files) // to /target/flink-yarn-tests-*. // The files from there are picked up by the ./tools/travis_watchdog.sh script diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 23fddd5..20b5417
[flink] branch release-1.8 updated: [FLINK-12301] Fix ScalaCaseClassSerializer to support value types
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new ecc6639 [FLINK-12301] Fix ScalaCaseClassSerializer to support value types ecc6639 is described below commit ecc6639053cb36672ce552bb7626f75ff98b8293 Author: Igal Shilman AuthorDate: Mon May 13 11:00:37 2019 +0200 [FLINK-12301] Fix ScalaCaseClassSerializer to support value types We now use Scala reflection because it correctly deals with Scala language features. --- .../scala/typeutils/ScalaCaseClassSerializer.scala | 65 +++--- .../ScalaCaseClassSerializerReflectionTest.scala | 41 ++ 2 files changed, 50 insertions(+), 56 deletions(-) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala index 7ff1427..fbaa2ac 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala @@ -18,15 +18,14 @@ package org.apache.flink.api.scala.typeutils +import java.io.ObjectInputStream + import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer import org.apache.flink.api.common.typeutils._ import org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.lookupConstructor -import java.io.ObjectInputStream -import java.lang.invoke.{MethodHandle, MethodHandles} - import scala.collection.JavaConverters._ import scala.reflect.runtime.universe @@ -38,16 +37,16 @@ import scala.reflect.runtime.universe */ @SerialVersionUID(1L) class ScalaCaseClassSerializer[T <: Product]( - clazz: Class[T], - scalaFieldSerializers: Array[TypeSerializer[_]] -) extends CaseClassSerializer[T](clazz, scalaFieldSerializers) -with SelfResolvingTypeSerializer[T] { +clazz: Class[T], +scalaFieldSerializers: Array[TypeSerializer[_]] +) extends CaseClassSerializer[T](clazz, scalaFieldSerializers) + with SelfResolvingTypeSerializer[T] { @transient private var constructor = lookupConstructor(clazz) override def createInstance(fields: Array[AnyRef]): T = { -constructor.invoke(fields).asInstanceOf[T] +constructor(fields) } override def snapshotConfiguration(): TypeSerializerSnapshot[T] = { @@ -55,8 +54,7 @@ class ScalaCaseClassSerializer[T <: Product]( } override def resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass( -s: TypeSerializerConfigSnapshot[T] - ): TypeSerializerSchemaCompatibility[T] = { + s: TypeSerializerConfigSnapshot[T]): TypeSerializerSchemaCompatibility[T] = { require(s.isInstanceOf[TupleSerializerConfigSnapshot[_]]) @@ -85,22 +83,8 @@ class ScalaCaseClassSerializer[T <: Product]( object ScalaCaseClassSerializer { - def lookupConstructor[T](clazz: Class[_]): MethodHandle = { -val types = findPrimaryConstructorParameterTypes(clazz, clazz.getClassLoader) - -val constructor = clazz.getConstructor(types: _*) - -val handle = MethodHandles - .lookup() - .unreflectConstructor(constructor) - .asSpreader(classOf[Array[AnyRef]], types.length) - -handle - } - - private def findPrimaryConstructorParameterTypes(cls: Class[_], cl: ClassLoader): - List[Class[_]] = { -val rootMirror = universe.runtimeMirror(cl) + def lookupConstructor[T](cls: Class[T]): Array[AnyRef] => T = { +val rootMirror = universe.runtimeMirror(cls.getClassLoader) val classSymbol = rootMirror.classSymbol(cls) require( @@ -113,30 +97,21 @@ object ScalaCaseClassSerializer { |""".stripMargin ) -val primaryConstructorSymbol = findPrimaryConstructorMethodSymbol(classSymbol) -val scalaTypes = getArgumentsTypes(primaryConstructorSymbol) -scalaTypes.map(tpe => scalaTypeToJavaClass(rootMirror)(tpe)) - } - - private def findPrimaryConstructorMethodSymbol(classSymbol: universe.ClassSymbol): - universe.MethodSymbol = { -classSymbol.toType +val primaryConstructorSymbol = classSymbol.toType .decl(universe.termNames.CONSTRUCTOR) .alternatives + .collectFirst({ +case constructorSymbol: universe.MethodSymbol if constructorSymbol.isPrimaryConstructor => + constructorSymbol + }) .head .asMethod - } - private def getArgumentsTypes(primaryConstructorSymbol: universe.MethodSymbol): - List[universe.Type] = { -primaryConstructorSymbol.typeSignature - .paramLists - .head - .map(sy
[flink] branch master updated: [FLINK-12301] Fix ScalaCaseClassSerializer to support value types
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9caf2c4 [FLINK-12301] Fix ScalaCaseClassSerializer to support value types 9caf2c4 is described below commit 9caf2c4355f851c7a8ca2b1fe9a1c6dab7bd95e3 Author: Igal Shilman AuthorDate: Mon May 13 11:00:37 2019 +0200 [FLINK-12301] Fix ScalaCaseClassSerializer to support value types We now use Scala reflection because it correctly deals with Scala language features. --- .../scala/typeutils/ScalaCaseClassSerializer.scala | 65 +++--- .../ScalaCaseClassSerializerReflectionTest.scala | 41 ++ 2 files changed, 50 insertions(+), 56 deletions(-) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala index 7ff1427..fbaa2ac 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaCaseClassSerializer.scala @@ -18,15 +18,14 @@ package org.apache.flink.api.scala.typeutils +import java.io.ObjectInputStream + import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.SelfResolvingTypeSerializer import org.apache.flink.api.common.typeutils._ import org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.lookupConstructor -import java.io.ObjectInputStream -import java.lang.invoke.{MethodHandle, MethodHandles} - import scala.collection.JavaConverters._ import scala.reflect.runtime.universe @@ -38,16 +37,16 @@ import scala.reflect.runtime.universe */ @SerialVersionUID(1L) class ScalaCaseClassSerializer[T <: Product]( - clazz: Class[T], - scalaFieldSerializers: Array[TypeSerializer[_]] -) extends CaseClassSerializer[T](clazz, scalaFieldSerializers) -with SelfResolvingTypeSerializer[T] { +clazz: Class[T], +scalaFieldSerializers: Array[TypeSerializer[_]] +) extends CaseClassSerializer[T](clazz, scalaFieldSerializers) + with SelfResolvingTypeSerializer[T] { @transient private var constructor = lookupConstructor(clazz) override def createInstance(fields: Array[AnyRef]): T = { -constructor.invoke(fields).asInstanceOf[T] +constructor(fields) } override def snapshotConfiguration(): TypeSerializerSnapshot[T] = { @@ -55,8 +54,7 @@ class ScalaCaseClassSerializer[T <: Product]( } override def resolveSchemaCompatibilityViaRedirectingToNewSnapshotClass( -s: TypeSerializerConfigSnapshot[T] - ): TypeSerializerSchemaCompatibility[T] = { + s: TypeSerializerConfigSnapshot[T]): TypeSerializerSchemaCompatibility[T] = { require(s.isInstanceOf[TupleSerializerConfigSnapshot[_]]) @@ -85,22 +83,8 @@ class ScalaCaseClassSerializer[T <: Product]( object ScalaCaseClassSerializer { - def lookupConstructor[T](clazz: Class[_]): MethodHandle = { -val types = findPrimaryConstructorParameterTypes(clazz, clazz.getClassLoader) - -val constructor = clazz.getConstructor(types: _*) - -val handle = MethodHandles - .lookup() - .unreflectConstructor(constructor) - .asSpreader(classOf[Array[AnyRef]], types.length) - -handle - } - - private def findPrimaryConstructorParameterTypes(cls: Class[_], cl: ClassLoader): - List[Class[_]] = { -val rootMirror = universe.runtimeMirror(cl) + def lookupConstructor[T](cls: Class[T]): Array[AnyRef] => T = { +val rootMirror = universe.runtimeMirror(cls.getClassLoader) val classSymbol = rootMirror.classSymbol(cls) require( @@ -113,30 +97,21 @@ object ScalaCaseClassSerializer { |""".stripMargin ) -val primaryConstructorSymbol = findPrimaryConstructorMethodSymbol(classSymbol) -val scalaTypes = getArgumentsTypes(primaryConstructorSymbol) -scalaTypes.map(tpe => scalaTypeToJavaClass(rootMirror)(tpe)) - } - - private def findPrimaryConstructorMethodSymbol(classSymbol: universe.ClassSymbol): - universe.MethodSymbol = { -classSymbol.toType +val primaryConstructorSymbol = classSymbol.toType .decl(universe.termNames.CONSTRUCTOR) .alternatives + .collectFirst({ +case constructorSymbol: universe.MethodSymbol if constructorSymbol.isPrimaryConstructor => + constructorSymbol + }) .head .asMethod - } - private def getArgumentsTypes(primaryConstructorSymbol: universe.MethodSymbol): - List[universe.Type] = { -primaryConstructorSymbol.typeSignature - .paramLists - .head - .map(symbol => sy
[flink] branch master updated: [FLINK-12164][runtime] Harden JobMasterTest against timeouts
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9c2bcae [FLINK-12164][runtime] Harden JobMasterTest against timeouts 9c2bcae is described below commit 9c2bcae735cd00336af3284e4f631afe061552ba Author: Chesnay Schepler AuthorDate: Thu May 9 17:01:37 2019 +0200 [FLINK-12164][runtime] Harden JobMasterTest against timeouts This closes #8388. --- .../src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 52792f8..ff9d6ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -202,7 +202,7 @@ public class JobMasterTest extends TestLogger { private static final Time testingTimeout = Time.seconds(10L); private static final long fastHeartbeatInterval = 1L; - private static final long fastHeartbeatTimeout = 5L; + private static final long fastHeartbeatTimeout = 10L; private static final long heartbeatInterval = 1000L; private static final long heartbeatTimeout = 5_000_000L;
[flink] branch master updated: [FLINK-12493] Add step to export Hadoop classpath to docs
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 518616e [FLINK-12493] Add step to export Hadoop classpath to docs 518616e is described below commit 518616e9f3527b2564a640b38c80db3eb54bef37 Author: Alberto Romero AuthorDate: Mon May 13 11:04:17 2019 +0100 [FLINK-12493] Add step to export Hadoop classpath to docs Github PR #8422 --- docs/ops/deployment/aws.md | 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/ops/deployment/aws.md b/docs/ops/deployment/aws.md index b465340..16e9bd7 100644 --- a/docs/ops/deployment/aws.md +++ b/docs/ops/deployment/aws.md @@ -56,7 +56,13 @@ when creating an EMR cluster. After creating your cluster, you can [connect to the master node](http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-connect-master-node.html) and install Flink: 1. Go the [Downloads Page]({{ site.download_url }}) and **download a binary version of Flink matching the Hadoop version** of your EMR cluster, e.g. Hadoop 2.7 for EMR releases 4.3.0, 4.4.0, or 4.5.0. -2. Extract the Flink distribution and you are ready to deploy [Flink jobs via YARN](yarn_setup.html) after **setting the Hadoop config directory**: +2. Make sure all the Hadoop dependencies are in the classpath before you submit any jobs to EMR: + +{% highlight bash %} +export HADOOP_CLASSPATH=`hadoop classpath` +{% endhighlight %} + +3. Extract the Flink distribution and you are ready to deploy [Flink jobs via YARN](yarn_setup.html) after **setting the Hadoop config directory**: {% highlight bash %} HADOOP_CONF_DIR=/etc/hadoop/conf ./bin/flink run -m yarn-cluster -yn 1 examples/streaming/WordCount.jar
[flink] branch master updated: [FLINK-12495][python][client] Move PythonGatewayServer into flink-clients.
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d1542e9 [FLINK-12495][python][client] Move PythonGatewayServer into flink-clients. d1542e9 is described below commit d1542e9561c6235feb902c9c6d781ba416b8f784 Author: sunjincheng121 AuthorDate: Mon May 13 14:50:37 2019 +0800 [FLINK-12495][python][client] Move PythonGatewayServer into flink-clients. This closes #8423 --- flink-clients/pom.xml | 7 +++ .../java/org/apache/flink/client}/python/PythonGatewayServer.java | 2 +- flink-core/pom.xml | 7 --- flink-python/pyflink/java_gateway.py | 2 +- pom.xml| 1 + 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index 6476798..514b6cd 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -68,6 +68,13 @@ under the License. commons-cli + + + net.sf.py4j + py4j + ${py4j.version} + + diff --git a/flink-core/src/main/java/org/apache/flink/api/python/PythonGatewayServer.java b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java similarity index 98% rename from flink-core/src/main/java/org/apache/flink/api/python/PythonGatewayServer.java rename to flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java index 3e17401..6432a67 100644 --- a/flink-core/src/main/java/org/apache/flink/api/python/PythonGatewayServer.java +++ b/flink-clients/src/main/java/org/apache/flink/client/python/PythonGatewayServer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.api.python; +package org.apache.flink.client.python; import py4j.GatewayServer; diff --git a/flink-core/pom.xml b/flink-core/pom.xml index c6ff872..0768e81 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -86,13 +86,6 @@ under the License. flink-shaded-guava - - - net.sf.py4j - py4j - 0.10.8.1 - - diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py index 5c931de..ed1dc89 100644 --- a/flink-python/pyflink/java_gateway.py +++ b/flink-python/pyflink/java_gateway.py @@ -62,7 +62,7 @@ def launch_gateway(): raise Exception("Windows system is not supported currently.") script = "./bin/pyflink-gateway-server.sh" command = [os.path.join(FLINK_HOME, script)] -command += ['-c', 'org.apache.flink.api.python.PythonGatewayServer'] +command += ['-c', 'org.apache.flink.client.python.PythonGatewayServer'] # Create a temporary directory where the gateway server should write the connection information. conn_info_dir = tempfile.mkdtemp() diff --git a/pom.xml b/pom.xml index 539e9d0..ed9ae64 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ under the License. 2.21.0 2.0.0-RC.4 1.3 + 0.10.8.1 false validate
[flink] branch master updated: [FLINK-12421][docs-zh] Synchronize the latest documentation changes (commits to 754cd71d) into Chinese documents
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 3f532e1 [FLINK-12421][docs-zh] Synchronize the latest documentation changes (commits to 754cd71d) into Chinese documents 3f532e1 is described below commit 3f532e18b96b83abdde189c4304f66c60b285d5c Author: Jark Wu AuthorDate: Tue May 7 10:11:08 2019 +0800 [FLINK-12421][docs-zh] Synchronize the latest documentation changes (commits to 754cd71d) into Chinese documents This closes #8354 --- docs/_includes/sidenav.html | 5 +- docs/dev/api_concepts.zh.md | 2 +- docs/dev/connectors/kafka.md| 2 +- docs/dev/connectors/kafka.zh.md | 2 +- docs/dev/execution.zh.md| 2 +- docs/dev/execution_configuration.zh.md | 2 +- docs/dev/execution_plans.zh.md | 2 +- docs/dev/packaging.zh.md| 4 +- docs/dev/parallel.zh.md | 2 +- docs/dev/restart_strategies.zh.md | 2 +- docs/dev/stream/state/queryable_state.zh.md | 44 +-- docs/dev/table/functions.zh.md | 402 +++- docs/dev/table/tableApi.zh.md | 294 +++- docs/ops/deployment/yarn_setup.zh.md| 1 + docs/ops/index.zh.md| 2 +- 15 files changed, 673 insertions(+), 95 deletions(-) diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html index 247a0e3..7f200b1 100644 --- a/docs/_includes/sidenav.html +++ b/docs/_includes/sidenav.html @@ -116,7 +116,10 @@ level is determined by 'nav-pos'. {% capture collapse_target %}"#collapse-{{ i }}" data-toggle="collapse"{% if active %} class="active"{% endif %}{% endcapture %} {% capture expand %}{% unless active %} {% endunless %}{% endcapture %} {{ title }}{{ expand }} - {% if this.nav-show_overview %}Overview{% endif %} +{% if this.nav-show_overview %} + +{% if page.is_default_language %}Overview{% else %}概览{% endif %} +{% endif %} {% assign elements = elements | push: children %} {% assign elementsPosStack = elementsPosStack | push: elementsPos %} {% assign posStack = posStack | push: pos %} diff --git a/docs/dev/api_concepts.zh.md b/docs/dev/api_concepts.zh.md index bd7ca5a..619ee70 100644 --- a/docs/dev/api_concepts.zh.md +++ b/docs/dev/api_concepts.zh.md @@ -783,7 +783,7 @@ defined in the `write()`and `readFields()` methods will be used for serializatio You can use special types, including Scala's `Either`, `Option`, and `Try`. The Java API has its own custom implementation of `Either`. -Similarly to Scala's `Either`, it represents a value of one two possible types, *Left* or *Right*. +Similarly to Scala's `Either`, it represents a value of two possible types, *Left* or *Right*. `Either` can be useful for error handling or operators that need to output two different types of records. Type Erasure & Type Inference diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index e4e4d5d..8f893c4 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -88,7 +88,7 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is >= 1.0.0 This universal Kafka connector attempts to track the latest version of the Kafka client. -The version of the client it uses may change between Flink releases. As of this release, it uses the Kafka 2.2.0 client. +The version of the client it uses may change between Flink releases. Starting with Flink 1.9 release, it uses the Kafka 2.2.0 client. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated flink-connector-kafka-0.11{{ site.scala_version_suffix }} and flink-connector-kafka-0.10{{ site.scala_version_suffix }} respectively. diff --git a/docs/dev/connectors/kafka.zh.md b/docs/dev/connectors/kafka.zh.md index 2412277..28f3a25 100644 --- a/docs/dev/connectors/kafka.zh.md +++ b/docs/dev/connectors/kafka.zh.md @@ -88,7 +88,7 @@ For most users, the `FlinkKafkaConsumer08` (part of `flink-connector-kafka`) is >= 1.0.0 This universal Kafka connector attempts to track the latest version of the Kafka client. -The version of the client it uses may change between Flink releases. +The version of the client it uses may change between Flink releases. Starting with Flink 1.9 release, it uses the Kafka 2.2.0 client. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated
[flink] branch release-1.6 updated: [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.6 by this push: new 0dda6fe [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators 0dda6fe is described below commit 0dda6fe9dff4f667b110cda39bfe9738ba615b24 Author: Congxian Qiu AuthorDate: Mon May 13 16:29:34 2019 +0800 [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators This closes #8338. --- .../streaming/state/RocksDBKeyedStateBackend.java | 16 +- .../tasks/OneInputStreamTaskTestHarness.java | 43 +++- .../runtime/tasks/StreamConfigChainer.java | 23 +- .../runtime/tasks/StreamMockEnvironment.java | 8 +- .../runtime/tasks/StreamTaskTestHarness.java | 21 +- .../state/StatefulOperatorChainedTaskTest.java | 259 + 6 files changed, 357 insertions(+), 13 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 17ba985..62c540f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -270,6 +270,9 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { /** Shared wrapper for batch writes to the RocksDB instance. */ private RocksDBWriteBatchWrapper writeBatchWrapper; + /** The local directory name of the current snapshot strategy. */ + private final String localDirectoryName; + public RocksDBKeyedStateBackend( String operatorIdentifier, ClassLoader userCodeClassLoader, @@ -319,6 +322,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); + this.localDirectoryName = this.backendUID.toString().replaceAll("[\\-]", ""); this.snapshotStrategy = enableIncrementalCheckpointing ? new IncrementalSnapshotStrategy() : @@ -1977,17 +1981,17 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId); - if (directory.exists()) { - FileUtils.deleteDirectory(directory); - } - - if (!directory.mkdirs()) { + if (!directory.exists() && !directory.mkdirs()) { throw new IOException("Local state base directory for checkpoint " + checkpointId + " already exists: " + directory); } // introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints. - File rdbSnapshotDir = new File(directory, "rocks_db"); + // append localDirectoryName here to solve directory collision problem when two stateful operators chained in one task. + File rdbSnapshotDir = new File(directory, localDirectoryName); + if (rdbSnapshotDir.exists()) { + FileUtils.deleteDirectory(rdbSnapshotDir); + } Path path = new Path(rdbSnapshotDir.toURI()); // create a "permanent" snapshot directory because local recovery is active. snapshotDirectory = SnapshotDirectory.permanent(path); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index 89a4f81..0a7efda 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/run