[jira] [Commented] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager
[ https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16292164#comment-16292164 ] ASF GitHub Bot commented on FLINK-7928: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4991 > Extend the filed in ResourceProfile for precisely calculating the resource of > a task manager > > > Key: FLINK-7928 > URL: https://issues.apache.org/jira/browse/FLINK-7928 > Project: Flink > Issue Type: Improvement > Components: JobManager, ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Fix For: 1.5.0 > > > ResourceProfile records all the resource requirements for a slot。It is > generated by JobMaster and then passed to ResourceManager with the slot > request. > A task in the slot needs three parts of resource: > 1. The resource for the operators, this is specified by the ResourceSpec user > defined > 2. The resource for the operators to communicating with their upstreams. For > example, the resource for buffer pools and so on. > 3. The resource for the operators to communicating with their downstreams. > Same as above. > So ResourceProfile should contain three parts of resource, the first part > from ResouceSpec, and the other two part be generated by Job Master. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4991 ---
[jira] [Resolved] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager
[ https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7928. -- Resolution: Fixed Fix Version/s: 1.5.0 Fixed via 5643d156cea72314c2240119b30aa32a65a0aeb7 > Extend the filed in ResourceProfile for precisely calculating the resource of > a task manager > > > Key: FLINK-7928 > URL: https://issues.apache.org/jira/browse/FLINK-7928 > Project: Flink > Issue Type: Improvement > Components: JobManager, ResourceManager >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Fix For: 1.5.0 > > > ResourceProfile records all the resource requirements for a slot。It is > generated by JobMaster and then passed to ResourceManager with the slot > request. > A task in the slot needs three parts of resource: > 1. The resource for the operators, this is specified by the ResourceSpec user > defined > 2. The resource for the operators to communicating with their upstreams. For > example, the resource for buffer pools and so on. > 3. The resource for the operators to communicating with their downstreams. > Same as above. > So ResourceProfile should contain three parts of resource, the first part > from ResouceSpec, and the other two part be generated by Job Master. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8258) Enable query configuration for batch queries
[ https://issues.apache.org/jira/browse/FLINK-8258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291887#comment-16291887 ] ASF GitHub Bot commented on FLINK-8258: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5169 [FLINK-8258] [table] Enable query configuration for batch queries ## What is the purpose of the change This PR enables the query configuration for queries in batch table environment. ## Brief change log - Adds a `BatchQueryConfig` parameter to `DataSetRel.translateToPlan()` and adjusts the corresponding subclasses. - Adds new `toDataset()` methods with `BatchQueryConfig` to `BatchTableEnvironment`. - Updates the `TableConversions.scala`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8258 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5169.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5169 commit 9cb36b957fc4ed2ef6ad965304b759e9c7a53300 Author: Xingcan CuiDate: 2017-12-14T09:37:40Z [FLINK-8258] [table] Enable query configuration for batch queries > Enable query configuration for batch queries > > > Key: FLINK-8258 > URL: https://issues.apache.org/jira/browse/FLINK-8258 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > Query configuration holds some parameters to configure the behavior of batch > queries. However, since there was nothing to set for batch queries before, > the configuration was not really passed. Due to FLINK-8236, we need to enable > it now. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5169: [FLINK-8258] [table] Enable query configuration fo...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5169 [FLINK-8258] [table] Enable query configuration for batch queries ## What is the purpose of the change This PR enables the query configuration for queries in batch table environment. ## Brief change log - Adds a `BatchQueryConfig` parameter to `DataSetRel.translateToPlan()` and adjusts the corresponding subclasses. - Adds new `toDataset()` methods with `BatchQueryConfig` to `BatchTableEnvironment`. - Updates the `TableConversions.scala`. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8258 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5169.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5169 commit 9cb36b957fc4ed2ef6ad965304b759e9c7a53300 Author: Xingcan CuiDate: 2017-12-14T09:37:40Z [FLINK-8258] [table] Enable query configuration for batch queries ---
[GitHub] flink pull request #5165: [FLINK-8285] [table] Enable query configuration fo...
Github user xccui closed the pull request at: https://github.com/apache/flink/pull/5165 ---
[jira] [Commented] (FLINK-8265) Missing jackson dependency for flink-mesos
[ https://issues.apache.org/jira/browse/FLINK-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291605#comment-16291605 ] Jared Stehler commented on FLINK-8265: -- There is a shaded jackson class with jackson2 in the package, but none with the path shown. It seems like there is a missing include for jackson here in the flink-mesos pom? {code} com.google.protobuf:protobuf-java org.apache.mesos:mesos com.netflix.fenzo:fenzo-core com.google.protobuf org.apache.flink.mesos.shaded.com.google.protobuf com.fasterxml.jackson org.apache.flink.mesos.shaded.com.fasterxml.jackson {code} I was able as a workaround to build a simple shaded jackson deps jar relocated to "org.apache.flink.mesos.shaded.com.fasterxml.jackson" and adding that to my /lib. > Missing jackson dependency for flink-mesos > -- > > Key: FLINK-8265 > URL: https://issues.apache.org/jira/browse/FLINK-8265 > Project: Flink > Issue Type: Bug > Components: Mesos >Affects Versions: 1.4.0 >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Critical > Fix For: 1.4.1 > > > The Jackson library that is required by Fenzo is missing from the Flink > distribution jar-file. > This manifests as an exception in certain circumstances when a hard > constraint is configured ("mesos.constraints.hard.hostattribute"). > {code} > NoClassDefFoundError: > org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper > at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35) > at > com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784) > at > com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581) > at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796) > at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8265) Missing jackson dependency for flink-mesos
Eron Wright created FLINK-8265: --- Summary: Missing jackson dependency for flink-mesos Key: FLINK-8265 URL: https://issues.apache.org/jira/browse/FLINK-8265 Project: Flink Issue Type: Bug Components: Mesos Affects Versions: 1.4.0 Reporter: Eron Wright Assignee: Eron Wright Priority: Critical Fix For: 1.4.1 The Jackson library that is required by Fenzo is missing from the Flink distribution jar-file. This manifests as an exception in certain circumstances when a hard constraint is configured ("mesos.constraints.hard.hostattribute"). {code} NoClassDefFoundError: org/apache/flink/mesos/shaded/com/fasterxml/jackson/databind/ObjectMapper at com.netflix.fenzo.ConstraintFailure.(ConstraintFailure.java:35) at com.netflix.fenzo.AssignableVirtualMachine.findFailedHardConstraints(AssignableVirtualMachine.java:784) at com.netflix.fenzo.AssignableVirtualMachine.tryRequest(AssignableVirtualMachine.java:581) at com.netflix.fenzo.TaskScheduler.evalAssignments(TaskScheduler.java:796) at com.netflix.fenzo.TaskScheduler.access$1500(TaskScheduler.java:70) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8257) Unify the value checks for setParallelism()
[ https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291536#comment-16291536 ] Eron Wright commented on FLINK-8257: - Related: in some cases the parallelism must be equal to that of the preceding operator. e.g. [DataStream::assignTimestampsAndWatermarks|https://github.com/apache/flink/blob/release-1.4.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L833] > Unify the value checks for setParallelism() > --- > > Key: FLINK-8257 > URL: https://issues.apache.org/jira/browse/FLINK-8257 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: Xingcan Cui > > The {{setParallelism()}} method exist in many components from different > levels. Some of the methods require the input value to be greater than {{1}} > (e.g., {{StreamTransformation.setParallelism()}}), while some of also allow > the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is {{-1}} by > default (e.g., {{DataSink.setParallelism()}}). We need to unify the value > checks for these methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5157: [hotfix] [docs] Consistent capitalization in Mesos docume...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5157 @joerg84 could you file a PR for this change in `docs/ops/config.md` and elsewhere in `docs/ops/deployment/mesos.md`? ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291336#comment-16291336 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027881 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { + assertJobExecutionResultNotCached(result.getJobId()); + jobExecutionResultCache.put(result.getJobId(), Either.Right(result)); + } + + public void put(final JobID jobId, Throwable throwable) { + assertJobExecutionResultNotCached(jobId); + jobExecutionResultCache.put(jobId, Either.Left(throwable)); + } + + public boolean contains(final JobID jobId) { + return jobExecutionResultCache.getIfPresent(jobId) != null; + } + + @Nullable + public Either get(final JobID jobId) { --- End diff -- Not sure if I am abusing Flink's `Either` here. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027881 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { + assertJobExecutionResultNotCached(result.getJobId()); + jobExecutionResultCache.put(result.getJobId(), Either.Right(result)); + } + + public void put(final JobID jobId, Throwable throwable) { + assertJobExecutionResultNotCached(jobId); + jobExecutionResultCache.put(jobId, Either.Left(throwable)); + } + + public boolean contains(final JobID jobId) { + return jobExecutionResultCache.getIfPresent(jobId) != null; + } + + @Nullable + public Either get(final JobID jobId) { --- End diff -- Not sure if I am abusing Flink's `Either` here. ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291334#comment-16291334 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027633 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; + +import static java.util.Objects.requireNonNull; + +/** + * Exception indicating that we could not find a + * {@link org.apache.flink.api.common.JobExecutionResult} under the given {@link JobID}. + */ +public class JobExecutionResultNotFoundException extends FlinkException { + + private final JobID jobId; + + private static final long serialVersionUID = 1L; --- End diff -- Should be on top. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291333#comment-16291333 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { --- End diff -- Javadocs are missing. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027633 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultNotFoundException.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.FlinkException; + +import static java.util.Objects.requireNonNull; + +/** + * Exception indicating that we could not find a + * {@link org.apache.flink.api.common.JobExecutionResult} under the given {@link JobID}. + */ +public class JobExecutionResultNotFoundException extends FlinkException { + + private final JobID jobId; + + private static final long serialVersionUID = 1L; --- End diff -- Should be on top. ---
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> + jobExecutionResultCache = + CacheBuilder.newBuilder() + .expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS) + .build(); + + public void put(final SerializedJobExecutionResult result) { --- End diff -- Javadocs are missing. ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291332#comment-16291332 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027178 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,10 +40,23 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); --- End diff -- I had to remove `FAIL_ON_MISSING_CREATOR_PROPERTIES` because `null` fields are not always represented in the JSON. The `RestClient` would otherwise run into problems. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157027178 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,10 +40,23 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); --- End diff -- I had to remove `FAIL_ON_MISSING_CREATOR_PROPERTIES` because `null` fields are not always represented in the JSON. The `RestClient` would otherwise run into problems. ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291328#comment-16291328 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157026590 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.JobExecutionResultNotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResult; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * Returns the {@link org.apache.flink.api.common.JobExecutionResult} for a given {@link JobID}. + */ +public class JobExecutionResultHandler --- End diff -- Sample response after running batch WordCount example: ``` { "status": { "id": "CREATED" }, "job-execution-result": { "id": "533a165a6de7f70919a54b1d6f36d3b3", "net-runtime": 0, "accumulator-results": { "94a58184eb17398571f35da42b714517":
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157026590 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.JobExecutionResultNotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResult; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; +import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** + * Returns the {@link org.apache.flink.api.common.JobExecutionResult} for a given {@link JobID}. + */ +public class JobExecutionResultHandler --- End diff -- Sample response after running batch WordCount example: ``` { "status": { "id": "CREATED" }, "job-execution-result": { "id": "533a165a6de7f70919a54b1d6f36d3b3", "net-runtime": 0, "accumulator-results": { "94a58184eb17398571f35da42b714517": "rO0ABXNyABNqYXZhLnV0aWwuQXJyYXlMaXN0eIHSHZnHYZ0DAAFJAARzaXpleHCqdwQAAACqdXIAAltCrPMX+AYIVOACAAB4cAYCYQV1cQB+AAILB2FjdGlvbgF1cQB+AAIKBmFmdGVyAXVxAH4AAgwIYWdhaW5zdAF1cQB+AAIIBGFsbAJ1cQB+AAIIBGFuZAx1cQB+AAIJBWFybXMBdXEAfgACCwdhcnJvd3MBdXEAfgACCQVhd3J5AXVxAH4AAgcDYXkBdXEAfgACCQViYXJlAXVxAH4AAgcDYmUEdXEAfgACCQViZWFyA3VxAH4AAgsHYm9ka2luAXVxAH4AAgoGYm91cm4BdXEAfgACCARidXQBdXEAfgACBwNieQJ1cQB+AAINCWNhbGFtaXR5AXVxAH4AAgkFY2FzdAF1cQB+AAIJBWNvaWwBdXEAfgACCQVjb21lAXVxAH4AAg8LY29uc2NpZW5jZQF1cQB+AAIRDWNvbnN1bW1hdGlvbgF1cQB+AAIOCmNvbnR1bWVseQF1cQB+AAIMCGNvdW50cnkBdXEAfgACDAhjb3dhcmRzAXVxAH4AAg0JY3VycmVudHMBdXEAfgACBgJkBHVxAH4AAgoGZGVhdGgCdXEAfgACCgZkZWxheQF1cQB+AAILB2Rlc3BpcwF1cQB+AAINCWRldm91dGx5AAA
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291325#comment-16291325 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157026313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,10 +40,23 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); + DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY); objectMapper.disable( SerializationFeature.FAIL_ON_EMPTY_BEANS); + + final SimpleModule jacksonFlinkModule = new SimpleModule(); + + final JavaType serializedValueWildcardType = objectMapper + .getTypeFactory() + .constructType(new TypeReference() { + }); + + jacksonFlinkModule.addSerializer(new SerializedValueSerializer(serializedValueWildcardType)); --- End diff -- Could also be done using `@JsonSerialization` annotation > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157026313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java --- @@ -33,10 +40,23 @@ objectMapper.enable( DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, - DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY, - DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES); + DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY); objectMapper.disable( SerializationFeature.FAIL_ON_EMPTY_BEANS); + + final SimpleModule jacksonFlinkModule = new SimpleModule(); + + final JavaType serializedValueWildcardType = objectMapper + .getTypeFactory() + .constructType(new TypeReference() { + }); + + jacksonFlinkModule.addSerializer(new SerializedValueSerializer(serializedValueWildcardType)); --- End diff -- Could also be done using `@JsonSerialization` annotation ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291320#comment-16291320 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157025791 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> --- End diff -- Cache isn't size limited. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157025791 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.types.Either; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link SerializedJobExecutionResult}s. + * + * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache> --- End diff -- Cache isn't size limited. ---
[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291255#comment-16291255 ] ASF GitHub Bot commented on FLINK-7878: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4911 > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Fix For: 1.5.0 > > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4911 ---
[jira] [Resolved] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7878. -- Resolution: Fixed Fix Version/s: 1.5.0 Fixed via 5b9ac9508b5d16f85b76a6de940458d385e23f0d > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Improvement > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > Fix For: 1.5.0 > > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291245#comment-16291245 ] ASF GitHub Bot commented on FLINK-8234: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5168 [FLINK-8234][flip6] WIP WIP @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8234 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5168.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5168 commit cc969846791bf818fbc81feb241a188410431ae5 Author: gyaoDate: 2017-12-14T16:27:16Z [FLINK-8234][flip6] WIP > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5168 [FLINK-8234][flip6] WIP WIP @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8234 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5168.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5168 commit cc969846791bf818fbc81feb241a188410431ae5 Author: gyaoDate: 2017-12-14T16:27:16Z [FLINK-8234][flip6] WIP ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291158#comment-16291158 ] ASF GitHub Bot commented on FLINK-7956: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5091 > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7956. Resolution: Done Fix Version/s: 1.5.0 Added via 0ef7fddeff8430fd40d2d7a1b8a6454fd9416ced > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5091 ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291153#comment-16291153 ] ASF GitHub Bot commented on FLINK-7956: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5091 Thanks for the review @GJL and @ifndef-SleePy. Travis passed locally. Merging this PR. > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5091: [FLINK-7956] [flip6] Add support for queued scheduling wi...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5091 Thanks for the review @GJL and @ifndef-SleePy. Travis passed locally. Merging this PR. ---
[jira] [Assigned] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-8234: --- Assignee: Gary Yao > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156958542 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and {@link java.lang.Long}. +* Does not support a null
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156958975 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes} +import org.apache.flink.types.Row + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.util.{Either, Try} + +/** + * This class gives access to the type information of the most common Scala types for which Flink + * has built-in serializers and comparators. + * + * This class contains types of [[org.apache.flink.api.common.typeinfo.Types]] and adds + * types for Scala specific classes (such as [[Unit]] or case classes). + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Scala macros allow to determine type information of classes and type parameters. You can + * use [[Types.of]] to let type information be determined automatically. + */ +@PublicEvolving +object Types { + + /** +* Generates type information based on the given class and/or its type parameters. +* +* The definition is similar to a [[org.apache.flink.api.common.typeinfo.TypeHint]] but does +* not require to implement anonymous classes. +* +* If the class could not be analyzed by the Scala type analyzer, the Java analyzer +* will be used. +* +* Example use: +* +* `Types.of[(Int, String, String)]` for Scala tuples +* `Types.of[Unit]` for Scala specific types +* +* @tparam T class to be analyzed +*/ + def of[T: TypeInformation]: TypeInformation[T] = { +val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]] +typeInfo + } + + /** +* Returns type information for Scala [[Nothing]]. Does not support a null value. +*/ + val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo + + /** +* Returns type information for Scala [[Unit]]. Does not support a null value. +*/ + val UNIT: TypeInformation[Unit] = new UnitTypeInfo + + /** +* Returns type information for [[String]] and [[java.lang.String]]. Supports a null value. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for primitive [[Byte]] and [[java.lang.Byte]]. Does not +* support a null value. +*/ + val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE + + /** +* Returns type information for primitive [[Boolean]] and [[java.lang.Boolean]]. Does not +* support a null value. +*/ + val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for primitive [[Short]] and [[java.lang.Short]]. Does not +* support a null value. +*/ + val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT + + /** +* Returns type information for primitive [[Int]] and [[java.lang.Integer]]. Does not +* support a null value. +*/ + val INT: TypeInformation[java.lang.Integer] = JTypes.INT + + /** +* Returns type information for primitive [[Long]] and [[java.lang.Long]]. Does not +* support a null value. +*/ + val LONG: TypeInformation[java.lang.Long] = JTypes.LONG + + /** +* Returns type information for primitive [[Float]] and [[java.lang.Float]]. Does not
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156951752 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and {@link java.lang.Long}. +* Does not support a null
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156938437 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. --- End diff -- "provide more specialized" -> "have dedicated"? ---
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156942641 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and {@link java.lang.Long}. +* Does not support a null
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291076#comment-16291076 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156945371 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,408 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where the extraction is not possible + * (or inefficient) as well as cases where type information has to be supplied manually. + * + * Depending on the API you are using (e.g. Scala API or Table API), there might be a more + * specialized Types class. + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and a +* wrapped {@link java.lang.Byte}. Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and a +* wrapped {@link java.lang.Boolean}. Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and a +* wrapped {@link java.lang.Short}. Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final SqlTimeTypeInfo SQL_DATE = SqlTimeTypeInfo.DATE; - public static final SqlTimeTypeInfo SQL_TIME = SqlTimeTypeInfo.TIME; - public static final SqlTimeTypeInfo SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP; + /** +* Returns type information for both a primitive int and a +* wrapped
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156951434 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and {@link java.lang.Long}. +* Does not support a null
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156949716 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and {@link java.lang.Long}. +* Does not support a null
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156945371 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,408 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where the extraction is not possible + * (or inefficient) as well as cases where type information has to be supplied manually. + * + * Depending on the API you are using (e.g. Scala API or Table API), there might be a more + * specialized Types class. + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and a +* wrapped {@link java.lang.Byte}. Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and a +* wrapped {@link java.lang.Boolean}. Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and a +* wrapped {@link java.lang.Short}. Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final SqlTimeTypeInfo SQL_DATE = SqlTimeTypeInfo.DATE; - public static final SqlTimeTypeInfo SQL_TIME = SqlTimeTypeInfo.TIME; - public static final SqlTimeTypeInfo SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP; + /** +* Returns type information for both a primitive int and a +* wrapped {@link java.lang.Integer}. Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and a +
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156955846 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -25,55 +27,125 @@ import org.apache.flink.types.Row import _root_.scala.annotation.varargs /** - * This class enumerates all supported types of the Table API. + * This class enumerates all supported types of the Table API & SQL. */ object Types { - val STRING = JTypes.STRING - val BOOLEAN = JTypes.BOOLEAN + /** +* Returns type information for a Table API string or SQL VARCHAR type. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for a Table API boolean or SQL BOOLEAN type. +*/ + val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for a Table API byte or SQL TINYINT type. +*/ + val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE + + /** +* Returns type information for a Table API short or SQL SMALLINT type. +*/ + val SHORT: TypeInformation[lang.Short] = JTypes.SHORT + + /** +* Returns type information for a Table API integer or SQL INT/INTEGER type. +*/ + val INT: TypeInformation[lang.Integer] = JTypes.INT - val BYTE = JTypes.BYTE - val SHORT = JTypes.SHORT - val INT = JTypes.INT - val LONG = JTypes.LONG - val FLOAT = JTypes.FLOAT - val DOUBLE = JTypes.DOUBLE - val DECIMAL = JTypes.DECIMAL + /** +* Returns type information for a Table API long or SQL BIGINT type. +*/ + val LONG: TypeInformation[lang.Long] = JTypes.LONG - val SQL_DATE = JTypes.SQL_DATE - val SQL_TIME = JTypes.SQL_TIME - val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP - val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS - val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Returns type information for a Table API float or SQL FLOAT/REAL type. +*/ + val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT + + /** +* Returns type information for a Table API integer or SQL DOUBLE type. +*/ + val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE /** -* Generates row type information. +* Returns type information for a Table API big decimal or SQL DECIMAL type. +*/ + val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC + + /** +* Returns type information for a Table API SQL date or SQL DATE type. +*/ + val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE --- End diff -- The [Table API docs](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/tableApi.html#data-types) and [SQL docs](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#data-types) need to be updated for `SQL_DATE`, `SQL_TIME`, and `SQL_TIMESTAMP`. ---
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291064#comment-16291064 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156938239 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,417 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. --- End diff -- rephrase as suggested? > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291075#comment-16291075 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156937819 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes} +import org.apache.flink.types.Row + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.util.{Either, Try} + +/** + * This class gives access to the type information of the most common Scala types for which Flink + * has built-in serializers and comparators. + * + * This class contains types of [[org.apache.flink.api.common.typeinfo.Types]] and adds + * types for Scala specific classes (such as [[Unit]] or case classes). + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Scala macros allow to determine type information of classes and type parameters. You can + * use [[Types.of]] to let type information be determined automatically. + */ +@PublicEvolving +object Types { + + /** +* Generates type information based on the given class and/or its type parameters. +* +* The definition is similar to a [[org.apache.flink.api.common.typeinfo.TypeHint]] but does +* not require to implement anonymous classes. +* +* If the class could not be analyzed by the Scala type analyzer, the Java analyzer +* will be used. +* +* Example use: +* +* `Types.of[(Int, String, String)]` for Scala tuples +* `Types.of[Unit]` for Scala specific types +* +* @tparam T class to be analyzed +*/ + def of[T: TypeInformation]: TypeInformation[T] = { +val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]] +typeInfo + } + + /** +* Returns type information for Scala [[Nothing]]. Does not support a null value. +*/ + val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo + + /** +* Returns type information for Scala [[Unit]]. Does not support a null value. +*/ + val UNIT: TypeInformation[Unit] = new UnitTypeInfo + + /** +* Returns type information for [[String]] and [[java.lang.String]]. Supports a null value. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for primitive [[Byte]] and [[java.lang.Byte]]. Does not +* support a null value. +*/ + val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE + + /** +* Returns type information for primitive [[Boolean]] and [[java.lang.Boolean]]. Does not +* support a null value. +*/ + val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for primitive [[Short]] and [[java.lang.Short]]. Does not +* support a null value. +*/ + val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT + + /** +* Returns type information for primitive [[Int]] and [[java.lang.Integer]]. Does not +* support a null value. +*/ + val INT: TypeInformation[java.lang.Integer] = JTypes.INT + + /** +* Returns type information for primitive [[Long]] and
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291081#comment-16291081 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156949716 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value.
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291073#comment-16291073 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156951434 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value.
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156940963 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and {@link java.lang.Long}. +* Does not support a null
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291065#comment-16291065 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156940309 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -25,55 +27,125 @@ import org.apache.flink.types.Row import _root_.scala.annotation.varargs /** - * This class enumerates all supported types of the Table API. + * This class enumerates all supported types of the Table API & SQL. */ object Types { - val STRING = JTypes.STRING - val BOOLEAN = JTypes.BOOLEAN + /** +* Returns type information for a Table API string or SQL VARCHAR type. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for a Table API boolean or SQL BOOLEAN type. +*/ + val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for a Table API byte or SQL TINYINT type. +*/ + val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE + + /** +* Returns type information for a Table API short or SQL SMALLINT type. +*/ + val SHORT: TypeInformation[lang.Short] = JTypes.SHORT + + /** +* Returns type information for a Table API integer or SQL INT/INTEGER type. +*/ + val INT: TypeInformation[lang.Integer] = JTypes.INT - val BYTE = JTypes.BYTE - val SHORT = JTypes.SHORT - val INT = JTypes.INT - val LONG = JTypes.LONG - val FLOAT = JTypes.FLOAT - val DOUBLE = JTypes.DOUBLE - val DECIMAL = JTypes.DECIMAL + /** +* Returns type information for a Table API long or SQL BIGINT type. +*/ + val LONG: TypeInformation[lang.Long] = JTypes.LONG - val SQL_DATE = JTypes.SQL_DATE - val SQL_TIME = JTypes.SQL_TIME - val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP - val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS - val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Returns type information for a Table API float or SQL FLOAT/REAL type. +*/ + val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT + + /** +* Returns type information for a Table API integer or SQL DOUBLE type. +*/ + val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE /** -* Generates row type information. +* Returns type information for a Table API big decimal or SQL DECIMAL type. +*/ + val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC + + /** +* Returns type information for a Table API SQL date or SQL DATE type. +*/ + val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE + + /** +* Returns type information for a Table API SQL time or SQL TIME type. +*/ + val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME + + /** +* Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type. +*/ + val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP + + /** +* Returns type information for a Table API interval of months. +*/ + val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS + + /** +* Returns type information for a Table API interval milliseconds. +*/ + val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS + + /** +* Returns type information for [[org.apache.flink.types.Row]] with fields of the given types. +* +* A row is a variable-length, null-aware composite type for storing multiple values in a +* deterministic field order. Every field can be null independent of the field's type. +* The type of row fields cannot be automatically inferred; therefore, it is required to pass +* type information whenever a row is used. * -* A row type consists of zero or more fields with a field name and a corresponding type. +* The schema of rows can have up to Integer.MAX_VALUE fields, however, all row instances +* must have the same length otherwise serialization fails or information is lost. * -* The fields have the default names (f0, f1, f2 ..). +* This method generates type information with fields of the given types; the fields have +* the default names (f0, f1, f2 ..). * -* @param types types of row fields; e.g. Types.STRING, Types.INT +* @param types The types of the row fields, e.g.,
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291067#comment-16291067 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156937828 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes} +import org.apache.flink.types.Row + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.util.{Either, Try} + +/** + * This class gives access to the type information of the most common Scala types for which Flink + * has built-in serializers and comparators. + * + * This class contains types of [[org.apache.flink.api.common.typeinfo.Types]] and adds + * types for Scala specific classes (such as [[Unit]] or case classes). + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Scala macros allow to determine type information of classes and type parameters. You can + * use [[Types.of]] to let type information be determined automatically. + */ +@PublicEvolving +object Types { + + /** +* Generates type information based on the given class and/or its type parameters. +* +* The definition is similar to a [[org.apache.flink.api.common.typeinfo.TypeHint]] but does +* not require to implement anonymous classes. +* +* If the class could not be analyzed by the Scala type analyzer, the Java analyzer +* will be used. +* +* Example use: +* +* `Types.of[(Int, String, String)]` for Scala tuples +* `Types.of[Unit]` for Scala specific types +* +* @tparam T class to be analyzed +*/ + def of[T: TypeInformation]: TypeInformation[T] = { +val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]] +typeInfo + } + + /** +* Returns type information for Scala [[Nothing]]. Does not support a null value. +*/ + val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo + + /** +* Returns type information for Scala [[Unit]]. Does not support a null value. +*/ + val UNIT: TypeInformation[Unit] = new UnitTypeInfo + + /** +* Returns type information for [[String]] and [[java.lang.String]]. Supports a null value. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for primitive [[Byte]] and [[java.lang.Byte]]. Does not +* support a null value. +*/ + val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE + + /** +* Returns type information for primitive [[Boolean]] and [[java.lang.Boolean]]. Does not +* support a null value. +*/ + val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for primitive [[Short]] and [[java.lang.Short]]. Does not +* support a null value. +*/ + val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT + + /** +* Returns type information for primitive [[Int]] and [[java.lang.Integer]]. Does not +* support a null value. +*/ + val INT: TypeInformation[java.lang.Integer] = JTypes.INT + + /** +* Returns type information for primitive [[Long]] and
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291078#comment-16291078 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156956706 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -116,7 +191,7 @@ object Types { * --- End diff -- Add information about how a Multiset is represented and the nullability of the set and its entries? > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156956706 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -116,7 +191,7 @@ object Types { * --- End diff -- Add information about how a Multiset is represented and the nullability of the set and its entries? ---
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291063#comment-16291063 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156940557 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala --- @@ -34,7 +34,7 @@ class CorrelateStringExpressionTest extends TableTestBase { val util = streamTestUtil() val sTab = util.addTable[(Int, Long, String)]('a, 'b, 'c) -val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*) +val typeInfo = new RowTypeInfo(Seq(typeutils.Types.INT, typeutils.Types.LONG, typeutils.Types.STRING): _*) --- End diff -- line exceeds 100 characters. > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291070#comment-16291070 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156956540 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -107,7 +180,9 @@ object Types { * @param keyType type of the keys of the map e.g. Types.STRING --- End diff -- Add information about nullability of Map and entries? > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291072#comment-16291072 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156940963 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value.
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291079#comment-16291079 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156955846 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -25,55 +27,125 @@ import org.apache.flink.types.Row import _root_.scala.annotation.varargs /** - * This class enumerates all supported types of the Table API. + * This class enumerates all supported types of the Table API & SQL. */ object Types { - val STRING = JTypes.STRING - val BOOLEAN = JTypes.BOOLEAN + /** +* Returns type information for a Table API string or SQL VARCHAR type. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for a Table API boolean or SQL BOOLEAN type. +*/ + val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for a Table API byte or SQL TINYINT type. +*/ + val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE + + /** +* Returns type information for a Table API short or SQL SMALLINT type. +*/ + val SHORT: TypeInformation[lang.Short] = JTypes.SHORT + + /** +* Returns type information for a Table API integer or SQL INT/INTEGER type. +*/ + val INT: TypeInformation[lang.Integer] = JTypes.INT - val BYTE = JTypes.BYTE - val SHORT = JTypes.SHORT - val INT = JTypes.INT - val LONG = JTypes.LONG - val FLOAT = JTypes.FLOAT - val DOUBLE = JTypes.DOUBLE - val DECIMAL = JTypes.DECIMAL + /** +* Returns type information for a Table API long or SQL BIGINT type. +*/ + val LONG: TypeInformation[lang.Long] = JTypes.LONG - val SQL_DATE = JTypes.SQL_DATE - val SQL_TIME = JTypes.SQL_TIME - val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP - val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS - val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Returns type information for a Table API float or SQL FLOAT/REAL type. +*/ + val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT + + /** +* Returns type information for a Table API integer or SQL DOUBLE type. +*/ + val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE /** -* Generates row type information. +* Returns type information for a Table API big decimal or SQL DECIMAL type. +*/ + val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC + + /** +* Returns type information for a Table API SQL date or SQL DATE type. +*/ + val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE --- End diff -- The [Table API docs](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/tableApi.html#data-types) and [SQL docs](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#data-types) need to be updated for `SQL_DATE`, `SQL_TIME`, and `SQL_TIMESTAMP`. > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291069#comment-16291069 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156940261 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -25,55 +27,125 @@ import org.apache.flink.types.Row import _root_.scala.annotation.varargs /** - * This class enumerates all supported types of the Table API. + * This class enumerates all supported types of the Table API & SQL. */ object Types { - val STRING = JTypes.STRING - val BOOLEAN = JTypes.BOOLEAN + /** +* Returns type information for a Table API string or SQL VARCHAR type. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for a Table API boolean or SQL BOOLEAN type. +*/ + val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for a Table API byte or SQL TINYINT type. +*/ + val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE + + /** +* Returns type information for a Table API short or SQL SMALLINT type. +*/ + val SHORT: TypeInformation[lang.Short] = JTypes.SHORT + + /** +* Returns type information for a Table API integer or SQL INT/INTEGER type. +*/ + val INT: TypeInformation[lang.Integer] = JTypes.INT - val BYTE = JTypes.BYTE - val SHORT = JTypes.SHORT - val INT = JTypes.INT - val LONG = JTypes.LONG - val FLOAT = JTypes.FLOAT - val DOUBLE = JTypes.DOUBLE - val DECIMAL = JTypes.DECIMAL + /** +* Returns type information for a Table API long or SQL BIGINT type. +*/ + val LONG: TypeInformation[lang.Long] = JTypes.LONG - val SQL_DATE = JTypes.SQL_DATE - val SQL_TIME = JTypes.SQL_TIME - val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP - val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS - val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Returns type information for a Table API float or SQL FLOAT/REAL type. +*/ + val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT + + /** +* Returns type information for a Table API integer or SQL DOUBLE type. +*/ + val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE /** -* Generates row type information. +* Returns type information for a Table API big decimal or SQL DECIMAL type. +*/ + val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC + + /** +* Returns type information for a Table API SQL date or SQL DATE type. +*/ + val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE + + /** +* Returns type information for a Table API SQL time or SQL TIME type. +*/ + val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME + + /** +* Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type. +*/ + val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP + + /** +* Returns type information for a Table API interval of months. +*/ + val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS + + /** +* Returns type information for a Table API interval milliseconds. +*/ + val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS + + /** +* Returns type information for [[org.apache.flink.types.Row]] with fields of the given types. +* +* A row is a variable-length, null-aware composite type for storing multiple values in a +* deterministic field order. Every field can be null independent of the field's type. +* The type of row fields cannot be automatically inferred; therefore, it is required to pass +* type information whenever a row is used. * -* A row type consists of zero or more fields with a field name and a corresponding type. +* The schema of rows can have up to Integer.MAX_VALUE fields, however, all row instances --- End diff -- line exceeds 100 characters. > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee:
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291071#comment-16291071 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156958975 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes} +import org.apache.flink.types.Row + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.util.{Either, Try} + +/** + * This class gives access to the type information of the most common Scala types for which Flink + * has built-in serializers and comparators. + * + * This class contains types of [[org.apache.flink.api.common.typeinfo.Types]] and adds + * types for Scala specific classes (such as [[Unit]] or case classes). + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Scala macros allow to determine type information of classes and type parameters. You can + * use [[Types.of]] to let type information be determined automatically. + */ +@PublicEvolving +object Types { + + /** +* Generates type information based on the given class and/or its type parameters. +* +* The definition is similar to a [[org.apache.flink.api.common.typeinfo.TypeHint]] but does +* not require to implement anonymous classes. +* +* If the class could not be analyzed by the Scala type analyzer, the Java analyzer +* will be used. +* +* Example use: +* +* `Types.of[(Int, String, String)]` for Scala tuples +* `Types.of[Unit]` for Scala specific types +* +* @tparam T class to be analyzed +*/ + def of[T: TypeInformation]: TypeInformation[T] = { +val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]] +typeInfo + } + + /** +* Returns type information for Scala [[Nothing]]. Does not support a null value. +*/ + val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo + + /** +* Returns type information for Scala [[Unit]]. Does not support a null value. +*/ + val UNIT: TypeInformation[Unit] = new UnitTypeInfo + + /** +* Returns type information for [[String]] and [[java.lang.String]]. Supports a null value. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for primitive [[Byte]] and [[java.lang.Byte]]. Does not +* support a null value. +*/ + val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE + + /** +* Returns type information for primitive [[Boolean]] and [[java.lang.Boolean]]. Does not +* support a null value. +*/ + val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for primitive [[Short]] and [[java.lang.Short]]. Does not +* support a null value. +*/ + val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT + + /** +* Returns type information for primitive [[Int]] and [[java.lang.Integer]]. Does not +* support a null value. +*/ + val INT: TypeInformation[java.lang.Integer] = JTypes.INT + + /** +* Returns type information for primitive [[Long]] and
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291068#comment-16291068 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156958542 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value.
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291074#comment-16291074 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156938437 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. --- End diff -- "provide more specialized" -> "have dedicated"? > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156956540 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -107,7 +180,9 @@ object Types { * @param keyType type of the keys of the map e.g. Types.STRING --- End diff -- Add information about nullability of Map and entries? ---
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156937819 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes} +import org.apache.flink.types.Row + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.util.{Either, Try} + +/** + * This class gives access to the type information of the most common Scala types for which Flink + * has built-in serializers and comparators. + * + * This class contains types of [[org.apache.flink.api.common.typeinfo.Types]] and adds + * types for Scala specific classes (such as [[Unit]] or case classes). + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Scala macros allow to determine type information of classes and type parameters. You can + * use [[Types.of]] to let type information be determined automatically. + */ +@PublicEvolving +object Types { + + /** +* Generates type information based on the given class and/or its type parameters. +* +* The definition is similar to a [[org.apache.flink.api.common.typeinfo.TypeHint]] but does +* not require to implement anonymous classes. +* +* If the class could not be analyzed by the Scala type analyzer, the Java analyzer +* will be used. +* +* Example use: +* +* `Types.of[(Int, String, String)]` for Scala tuples +* `Types.of[Unit]` for Scala specific types +* +* @tparam T class to be analyzed +*/ + def of[T: TypeInformation]: TypeInformation[T] = { +val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]] +typeInfo + } + + /** +* Returns type information for Scala [[Nothing]]. Does not support a null value. +*/ + val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo + + /** +* Returns type information for Scala [[Unit]]. Does not support a null value. +*/ + val UNIT: TypeInformation[Unit] = new UnitTypeInfo + + /** +* Returns type information for [[String]] and [[java.lang.String]]. Supports a null value. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for primitive [[Byte]] and [[java.lang.Byte]]. Does not +* support a null value. +*/ + val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE + + /** +* Returns type information for primitive [[Boolean]] and [[java.lang.Boolean]]. Does not +* support a null value. +*/ + val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for primitive [[Short]] and [[java.lang.Short]]. Does not +* support a null value. +*/ + val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT + + /** +* Returns type information for primitive [[Int]] and [[java.lang.Integer]]. Does not +* support a null value. +*/ + val INT: TypeInformation[java.lang.Integer] = JTypes.INT + + /** +* Returns type information for primitive [[Long]] and [[java.lang.Long]]. Does not +* support a null value. +*/ + val LONG: TypeInformation[java.lang.Long] = JTypes.LONG + + /** +* Returns type information for primitive [[Float]] and [[java.lang.Float]]. Does not
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291077#comment-16291077 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156951752 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value.
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291066#comment-16291066 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156941632 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value.
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291080#comment-16291080 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156942641 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value.
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156938239 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,417 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. --- End diff -- rephrase as suggested? ---
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156941632 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,422 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Either; +import org.apache.flink.types.Row; +import org.apache.flink.types.Value; +import java.lang.reflect.Field; import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and {@link java.lang.Long}. +* Does not support a null
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156940261 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -25,55 +27,125 @@ import org.apache.flink.types.Row import _root_.scala.annotation.varargs /** - * This class enumerates all supported types of the Table API. + * This class enumerates all supported types of the Table API & SQL. */ object Types { - val STRING = JTypes.STRING - val BOOLEAN = JTypes.BOOLEAN + /** +* Returns type information for a Table API string or SQL VARCHAR type. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for a Table API boolean or SQL BOOLEAN type. +*/ + val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for a Table API byte or SQL TINYINT type. +*/ + val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE + + /** +* Returns type information for a Table API short or SQL SMALLINT type. +*/ + val SHORT: TypeInformation[lang.Short] = JTypes.SHORT + + /** +* Returns type information for a Table API integer or SQL INT/INTEGER type. +*/ + val INT: TypeInformation[lang.Integer] = JTypes.INT - val BYTE = JTypes.BYTE - val SHORT = JTypes.SHORT - val INT = JTypes.INT - val LONG = JTypes.LONG - val FLOAT = JTypes.FLOAT - val DOUBLE = JTypes.DOUBLE - val DECIMAL = JTypes.DECIMAL + /** +* Returns type information for a Table API long or SQL BIGINT type. +*/ + val LONG: TypeInformation[lang.Long] = JTypes.LONG - val SQL_DATE = JTypes.SQL_DATE - val SQL_TIME = JTypes.SQL_TIME - val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP - val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS - val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Returns type information for a Table API float or SQL FLOAT/REAL type. +*/ + val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT + + /** +* Returns type information for a Table API integer or SQL DOUBLE type. +*/ + val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE /** -* Generates row type information. +* Returns type information for a Table API big decimal or SQL DECIMAL type. +*/ + val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC + + /** +* Returns type information for a Table API SQL date or SQL DATE type. +*/ + val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE + + /** +* Returns type information for a Table API SQL time or SQL TIME type. +*/ + val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME + + /** +* Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type. +*/ + val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP + + /** +* Returns type information for a Table API interval of months. +*/ + val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS + + /** +* Returns type information for a Table API interval milliseconds. +*/ + val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS + + /** +* Returns type information for [[org.apache.flink.types.Row]] with fields of the given types. +* +* A row is a variable-length, null-aware composite type for storing multiple values in a +* deterministic field order. Every field can be null independent of the field's type. +* The type of row fields cannot be automatically inferred; therefore, it is required to pass +* type information whenever a row is used. * -* A row type consists of zero or more fields with a field name and a corresponding type. +* The schema of rows can have up to Integer.MAX_VALUE fields, however, all row instances --- End diff -- line exceeds 100 characters. ---
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156940309 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -25,55 +27,125 @@ import org.apache.flink.types.Row import _root_.scala.annotation.varargs /** - * This class enumerates all supported types of the Table API. + * This class enumerates all supported types of the Table API & SQL. */ object Types { - val STRING = JTypes.STRING - val BOOLEAN = JTypes.BOOLEAN + /** +* Returns type information for a Table API string or SQL VARCHAR type. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for a Table API boolean or SQL BOOLEAN type. +*/ + val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for a Table API byte or SQL TINYINT type. +*/ + val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE + + /** +* Returns type information for a Table API short or SQL SMALLINT type. +*/ + val SHORT: TypeInformation[lang.Short] = JTypes.SHORT + + /** +* Returns type information for a Table API integer or SQL INT/INTEGER type. +*/ + val INT: TypeInformation[lang.Integer] = JTypes.INT - val BYTE = JTypes.BYTE - val SHORT = JTypes.SHORT - val INT = JTypes.INT - val LONG = JTypes.LONG - val FLOAT = JTypes.FLOAT - val DOUBLE = JTypes.DOUBLE - val DECIMAL = JTypes.DECIMAL + /** +* Returns type information for a Table API long or SQL BIGINT type. +*/ + val LONG: TypeInformation[lang.Long] = JTypes.LONG - val SQL_DATE = JTypes.SQL_DATE - val SQL_TIME = JTypes.SQL_TIME - val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP - val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS - val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + /** +* Returns type information for a Table API float or SQL FLOAT/REAL type. +*/ + val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT + + /** +* Returns type information for a Table API integer or SQL DOUBLE type. +*/ + val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE /** -* Generates row type information. +* Returns type information for a Table API big decimal or SQL DECIMAL type. +*/ + val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC + + /** +* Returns type information for a Table API SQL date or SQL DATE type. +*/ + val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE + + /** +* Returns type information for a Table API SQL time or SQL TIME type. +*/ + val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME + + /** +* Returns type information for a Table API SQL timestamp or SQL TIMESTAMP type. +*/ + val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP + + /** +* Returns type information for a Table API interval of months. +*/ + val INTERVAL_MONTHS: TypeInformation[lang.Integer] = TimeIntervalTypeInfo.INTERVAL_MONTHS + + /** +* Returns type information for a Table API interval milliseconds. +*/ + val INTERVAL_MILLIS: TypeInformation[lang.Long] = TimeIntervalTypeInfo.INTERVAL_MILLIS + + /** +* Returns type information for [[org.apache.flink.types.Row]] with fields of the given types. +* +* A row is a variable-length, null-aware composite type for storing multiple values in a +* deterministic field order. Every field can be null independent of the field's type. +* The type of row fields cannot be automatically inferred; therefore, it is required to pass +* type information whenever a row is used. * -* A row type consists of zero or more fields with a field name and a corresponding type. +* The schema of rows can have up to Integer.MAX_VALUE fields, however, all row instances +* must have the same length otherwise serialization fails or information is lost. * -* The fields have the default names (f0, f1, f2 ..). +* This method generates type information with fields of the given types; the fields have +* the default names (f0, f1, f2 ..). * -* @param types types of row fields; e.g. Types.STRING, Types.INT +* @param types The types of the row fields, e.g., Types.STRING, Types.INT */ @varargs def ROW(types: TypeInformation[_]*): TypeInformation[Row] = { JTypes.ROW(types: _*) } /** -* Generates row type information. +* Returns
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156937828 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/Types.scala --- @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils + +import org.apache.flink.annotation.PublicEvolving +import org.apache.flink.api.common.functions.InvalidTypesException +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types => JTypes} +import org.apache.flink.types.Row + +import _root_.scala.collection.JavaConverters._ +import _root_.scala.util.{Either, Try} + +/** + * This class gives access to the type information of the most common Scala types for which Flink + * has built-in serializers and comparators. + * + * This class contains types of [[org.apache.flink.api.common.typeinfo.Types]] and adds + * types for Scala specific classes (such as [[Unit]] or case classes). + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Scala macros allow to determine type information of classes and type parameters. You can + * use [[Types.of]] to let type information be determined automatically. + */ +@PublicEvolving +object Types { + + /** +* Generates type information based on the given class and/or its type parameters. +* +* The definition is similar to a [[org.apache.flink.api.common.typeinfo.TypeHint]] but does +* not require to implement anonymous classes. +* +* If the class could not be analyzed by the Scala type analyzer, the Java analyzer +* will be used. +* +* Example use: +* +* `Types.of[(Int, String, String)]` for Scala tuples +* `Types.of[Unit]` for Scala specific types +* +* @tparam T class to be analyzed +*/ + def of[T: TypeInformation]: TypeInformation[T] = { +val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]] +typeInfo + } + + /** +* Returns type information for Scala [[Nothing]]. Does not support a null value. +*/ + val NOTHING: TypeInformation[Nothing] = new ScalaNothingTypeInfo + + /** +* Returns type information for Scala [[Unit]]. Does not support a null value. +*/ + val UNIT: TypeInformation[Unit] = new UnitTypeInfo + + /** +* Returns type information for [[String]] and [[java.lang.String]]. Supports a null value. +*/ + val STRING: TypeInformation[String] = JTypes.STRING + + /** +* Returns type information for primitive [[Byte]] and [[java.lang.Byte]]. Does not +* support a null value. +*/ + val BYTE: TypeInformation[java.lang.Byte] = JTypes.BYTE + + /** +* Returns type information for primitive [[Boolean]] and [[java.lang.Boolean]]. Does not +* support a null value. +*/ + val BOOLEAN: TypeInformation[java.lang.Boolean] = JTypes.BOOLEAN + + /** +* Returns type information for primitive [[Short]] and [[java.lang.Short]]. Does not +* support a null value. +*/ + val SHORT: TypeInformation[java.lang.Short] = JTypes.SHORT + + /** +* Returns type information for primitive [[Int]] and [[java.lang.Integer]]. Does not +* support a null value. +*/ + val INT: TypeInformation[java.lang.Integer] = JTypes.INT + + /** +* Returns type information for primitive [[Long]] and [[java.lang.Long]]. Does not +* support a null value. +*/ + val LONG: TypeInformation[java.lang.Long] = JTypes.LONG + + /** +* Returns type information for primitive [[Float]] and [[java.lang.Float]]. Does not
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r156940557 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/stringexpr/CorrelateStringExpressionTest.scala --- @@ -34,7 +34,7 @@ class CorrelateStringExpressionTest extends TableTestBase { val util = streamTestUtil() val sTab = util.addTable[(Int, Long, String)]('a, 'b, 'c) -val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*) +val typeInfo = new RowTypeInfo(Seq(typeutils.Types.INT, typeutils.Types.LONG, typeutils.Types.STRING): _*) --- End diff -- line exceeds 100 characters. ---
[jira] [Commented] (FLINK-8087) Decouple Slot from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-8087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291016#comment-16291016 ] ASF GitHub Bot commented on FLINK-8087: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5088 > Decouple Slot from SlotPool > --- > > Key: FLINK-8087 > URL: https://issues.apache.org/jira/browse/FLINK-8087 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to let the {{SlotPool}} return a a different {{LogicalSlot}} > implementation than {{SimpleSlot}} we should not store the {{Slot}} inside of > the {{SlotPool}}. Moreover, we should introduce a abstraction for the > {{AllocatedSlot}} which contains the information required by the > {{SimpleSlot}}. That way we decouple the {{SimpleSlot}} from the > {{AllocatedSlot}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8089) Fulfil slot requests with unused offered slots
[ https://issues.apache.org/jira/browse/FLINK-8089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291014#comment-16291014 ] ASF GitHub Bot commented on FLINK-8089: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5090 > Fulfil slot requests with unused offered slots > -- > > Key: FLINK-8089 > URL: https://issues.apache.org/jira/browse/FLINK-8089 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{SlotPool}} adds unused offered slots to the list of available slots > without checking whether another pending slot request could be fulfilled with > this slot. This should be changed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8120) Cannot access Web UI from YARN application overview in FLIP-6 mode
[ https://issues.apache.org/jira/browse/FLINK-8120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291015#comment-16291015 ] ASF GitHub Bot commented on FLINK-8120: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5128 > Cannot access Web UI from YARN application overview in FLIP-6 mode > -- > > Key: FLINK-8120 > URL: https://issues.apache.org/jira/browse/FLINK-8120 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The Web UI cannot be accessed through YARN's application overview (_Tracking > UI_ link). The proxy displays a stacktrace. > {noformat} > Caused by: > org.apache.http.client.ClientProtocolException > at > org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:888) > at > org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) > at > org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107) > at > org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:242) > at > org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.methodAction(WebAppProxyServlet.java:461) > at > org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:290) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) > at > org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221) > at > com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66) > at > com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900) > at > com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) > at > org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:178) > at > com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) > at > com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) > at > com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) > at > com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) > at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at > org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at > org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:636) > at > org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationFilter.doFilter(DelegationTokenAuthenticationFilter.java:294) > at > org.apache.hadoop.security.authentication.server.AuthenticationFilter.doFilter(AuthenticationFilter.java:588) > at > org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter.doFilter(RMAuthenticationFilter.java:82) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at > org.apache.hadoop.http.HttpServer2$QuotingInputFilter.doFilter(HttpServer2.java:1353) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at org.apache.hadoop.http.NoCacheFilter.doFilter(NoCacheFilter.java:45) > at > org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) > at > org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399) > at > org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216) > at > org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182) > at > org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766) > at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450) > at > org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230) > at >
[jira] [Commented] (FLINK-8088) Bind logical slots to their request id instead of the slot allocation id
[ https://issues.apache.org/jira/browse/FLINK-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291017#comment-16291017 ] ASF GitHub Bot commented on FLINK-8088: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5089 > Bind logical slots to their request id instead of the slot allocation id > > > Key: FLINK-8088 > URL: https://issues.apache.org/jira/browse/FLINK-8088 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > Since allocated slots can be reused to fulfil multiple slot requests, we > should bind the resulting logical slots to their slot request id instead of > the allocation id of the underlying allocated slot. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5128: [FLINK-8120] [flip6] Register Yarn application wit...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5128 ---
[GitHub] flink pull request #5089: [FLINK-8088] Associate logical slots with the slot...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5089 ---
[GitHub] flink pull request #5088: [FLINK-8087] Decouple Slot from AllocatedSlot
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5088 ---
[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5090 ---
[jira] [Commented] (FLINK-8089) Fulfil slot requests with unused offered slots
[ https://issues.apache.org/jira/browse/FLINK-8089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291010#comment-16291010 ] ASF GitHub Bot commented on FLINK-8089: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5090 Thanks for the review @GJL. Merging this PR. > Fulfil slot requests with unused offered slots > -- > > Key: FLINK-8089 > URL: https://issues.apache.org/jira/browse/FLINK-8089 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > The {{SlotPool}} adds unused offered slots to the list of available slots > without checking whether another pending slot request could be fulfilled with > this slot. This should be changed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8087) Decouple Slot from SlotPool
[ https://issues.apache.org/jira/browse/FLINK-8087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291007#comment-16291007 ] ASF GitHub Bot commented on FLINK-8087: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5088 Thanks for the review @GJL. Merging this PR. > Decouple Slot from SlotPool > --- > > Key: FLINK-8087 > URL: https://issues.apache.org/jira/browse/FLINK-8087 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to let the {{SlotPool}} return a a different {{LogicalSlot}} > implementation than {{SimpleSlot}} we should not store the {{Slot}} inside of > the {{SlotPool}}. Moreover, we should introduce a abstraction for the > {{AllocatedSlot}} which contains the information required by the > {{SimpleSlot}}. That way we decouple the {{SimpleSlot}} from the > {{AllocatedSlot}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8088) Bind logical slots to their request id instead of the slot allocation id
[ https://issues.apache.org/jira/browse/FLINK-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16291009#comment-16291009 ] ASF GitHub Bot commented on FLINK-8088: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5089 Merging this PR. > Bind logical slots to their request id instead of the slot allocation id > > > Key: FLINK-8088 > URL: https://issues.apache.org/jira/browse/FLINK-8088 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > Since allocated slots can be reused to fulfil multiple slot requests, we > should bind the resulting logical slots to their slot request id instead of > the allocation id of the underlying allocated slot. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5090: [FLINK-8089] Also check for other pending slot requests i...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5090 Thanks for the review @GJL. Merging this PR. ---
[GitHub] flink issue #5088: [FLINK-8087] Decouple Slot from AllocatedSlot
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5088 Thanks for the review @GJL. Merging this PR. ---
[GitHub] flink issue #5089: [FLINK-8088] Associate logical slots with the slot reques...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5089 Merging this PR. ---
[jira] [Commented] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290978#comment-16290978 ] Ryan Brideau commented on FLINK-8256: - That works for me! Thanks again. > Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException > -- > > Key: FLINK-8256 > URL: https://issues.apache.org/jira/browse/FLINK-8256 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 > Environment: macOS, Local Flink v1.4.0, Scala 2.11 >Reporter: Ryan Brideau > > I built the newest release locally today, but when I try to filter a stream > using an anonymous or named function, I get an error. Here's a simple example: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > object TestFunction { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val someArray = Array(1,2,3) > val stream = env.fromCollection(someArray).filter(_ => true) > stream.print().setParallelism(1) > env.execute("Testing Function") > } > } > {code} > This results in: > {code:java} > Job execution switched to status FAILING. > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type > scala.Function1 in instance of > org.apache.flink.streaming.api.scala.DataStream$$anon$7 > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) > ... 6 more > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) > at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20) > at org.peopleinmotion.TestFunction.main(TestFunction.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at >
[jira] [Commented] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290970#comment-16290970 ] Stephan Ewen commented on FLINK-8256: - Great to hear! Do you think we should close the issue, with a reference to FLINK-8264? (which should make sure this works even when the project setup uses an older quickstart template) > Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException > -- > > Key: FLINK-8256 > URL: https://issues.apache.org/jira/browse/FLINK-8256 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 > Environment: macOS, Local Flink v1.4.0, Scala 2.11 >Reporter: Ryan Brideau > > I built the newest release locally today, but when I try to filter a stream > using an anonymous or named function, I get an error. Here's a simple example: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > object TestFunction { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val someArray = Array(1,2,3) > val stream = env.fromCollection(someArray).filter(_ => true) > stream.print().setParallelism(1) > env.execute("Testing Function") > } > } > {code} > This results in: > {code:java} > Job execution switched to status FAILING. > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type > scala.Function1 in instance of > org.apache.flink.streaming.api.scala.DataStream$$anon$7 > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) > ... 6 more > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) > at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20) > at
[jira] [Commented] (FLINK-8264) Add Scala to the parent-first loading patterns
[ https://issues.apache.org/jira/browse/FLINK-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290961#comment-16290961 ] ASF GitHub Bot commented on FLINK-8264: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5166 @aljoscha I think you may have an opinion on this one > Add Scala to the parent-first loading patterns > -- > > Key: FLINK-8264 > URL: https://issues.apache.org/jira/browse/FLINK-8264 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.4.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.5.0, 1.4.1 > > > A confusing experience happens when users accidentally package the Scala > Library into their jar file. The reversed class loading duplicates Scala's > classes, leading to exceptions like the one below. > By adding {{scala.}} to the default 'parent-first-patterns' we can improve > the user experience in such situations. > Exception Stack Trace: > {code} > java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type > scala.Function1 in instance of > org.apache.flink.streaming.api.scala.DataStream$$anon$7 > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) > ... 6 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5166: [FLINK-8264] [core] Add 'scala.' to the 'parent-first' cl...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5166 @aljoscha I think you may have an opinion on this one ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290867#comment-16290867 ] ASF GitHub Bot commented on FLINK-7956: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r156948978 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r156948978 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final
[jira] [Comment Edited] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290846#comment-16290846 ] Ryan Brideau edited comment on FLINK-8256 at 12/14/17 1:45 PM: --- Thanks for looking into this so quickly. I managed to track down the root of the issue on my end. I had built my project previously using the snapshot archetype, and not the newest 1.4.0 one: {code:java} mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.4-SNAPSHOT {code} To fix the problem I just built a new empty project using the 1.4.0 archetype version and did a diff of the pom.xml of the two, updating my existing one to match the new one, and now everything works perfectly. I suspect that anybody who made a project recently might find themselves in the same situation. was (Author: brideau): Thanks for looking into this so quickly. I managed to track down the root of the issue on my end. I had built my project previously using the snapshot archetype, and not the newest 1.4.0 one: {code:java} mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.4-SNAPSHOT {code} To fix the problem I just build a new empty project using the 1.4.0 archetype version and did a diff of the pom.xml of the two, updating my existing one to match the new one, and now everything works perfectly. I suspect that anybody who made a project recently might find themselves in the same situation. > Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException > -- > > Key: FLINK-8256 > URL: https://issues.apache.org/jira/browse/FLINK-8256 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 > Environment: macOS, Local Flink v1.4.0, Scala 2.11 >Reporter: Ryan Brideau > > I built the newest release locally today, but when I try to filter a stream > using an anonymous or named function, I get an error. Here's a simple example: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > object TestFunction { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val someArray = Array(1,2,3) > val stream = env.fromCollection(someArray).filter(_ => true) > stream.print().setParallelism(1) > env.execute("Testing Function") > } > } > {code} > This results in: > {code:java} > Job execution switched to status FAILING. > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type > scala.Function1 in instance of > org.apache.flink.streaming.api.scala.DataStream$$anon$7 > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at
[jira] [Commented] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-8256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290846#comment-16290846 ] Ryan Brideau commented on FLINK-8256: - Thanks for looking into this so quickly. I managed to track down the root of the issue on my end. I had built my project previously using the snapshot archetype, and not the newest 1.4.0 one: {code:java} mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.4-SNAPSHOT {code} To fix the problem I just build a new empty project using the 1.4.0 archetype version and did a diff of the pom.xml of the two, updating my existing one to match the new one, and now everything works perfectly. I suspect that anybody who made a project recently might find themselves in the same situation. > Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException > -- > > Key: FLINK-8256 > URL: https://issues.apache.org/jira/browse/FLINK-8256 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 > Environment: macOS, Local Flink v1.4.0, Scala 2.11 >Reporter: Ryan Brideau > > I built the newest release locally today, but when I try to filter a stream > using an anonymous or named function, I get an error. Here's a simple example: > {code:java} > import org.apache.flink.api.java.utils.ParameterTool > import org.apache.flink.streaming.api.scala._ > object TestFunction { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val params = ParameterTool.fromArgs(args) > env.getConfig.setGlobalJobParameters(params) > val someArray = Array(1,2,3) > val stream = env.fromCollection(someArray).filter(_ => true) > stream.print().setParallelism(1) > env.execute("Testing Function") > } > } > {code} > This results in: > {code:java} > Job execution switched to status FAILING. > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type > scala.Function1 in instance of > org.apache.flink.streaming.api.scala.DataStream$$anon$7 > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) > ... 6 more > 12/13/2017 15:10:01 Job execution switched to status FAILED. > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) > at >
[jira] [Commented] (FLINK-8200) RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name
[ https://issues.apache.org/jira/browse/FLINK-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290830#comment-16290830 ] ASF GitHub Bot commented on FLINK-8200: --- Github user wenlong88 closed the pull request at: https://github.com/apache/flink/pull/5122 > RocksDBAsyncSnapshotTest should use temp fold instead of fold with fixed name > - > > Key: FLINK-8200 > URL: https://issues.apache.org/jira/browse/FLINK-8200 > Project: Flink > Issue Type: Bug >Reporter: Wenlong Lyu >Assignee: Wenlong Lyu > > The following case failed when different user run the test in the same > machine. > Tests run: 4, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.226 sec <<< > FAILURE! - in > org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest > testCleanupOfSnapshotsInFailureCase(org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest) > Time elapsed: 0.023 sec <<< ERROR! > java.io.IOException: No local storage directories available. Local DB files > directory 'file:/tmp/foobar' does not exist and cannot be created. > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.lazyInitializeForJob(RocksDBStateBackend.java:251) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:300) > at > org.apache.flink.contrib.streaming.state.RocksDBAsyncSnapshotTest.testCleanupOfSnapshotsInFailureCase(RocksDBAsyncSnapshotTest.java:338) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5122: [FLINK-8200] RocksDBAsyncSnapshotTest should use t...
Github user wenlong88 closed the pull request at: https://github.com/apache/flink/pull/5122 ---
[jira] [Closed] (FLINK-8262) IndividualRestartsConcurrencyTest.testLocalFailureFailsPendingCheckpoints fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-8262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8262. Resolution: Fixed Fixed via e80dd8ea3fef0398048a40c3ffd5136bef204b80 > IndividualRestartsConcurrencyTest.testLocalFailureFailsPendingCheckpoints > fails on Travis > - > > Key: FLINK-8262 > URL: https://issues.apache.org/jira/browse/FLINK-8262 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.5.0 > > > The > {{IndividualRestartsConcurrencyTest.testLocalFailureFailsPendingCheckpoints}} > fails on Travis. The reason is a concurrent restart attempt which fails and > thus discards all pending checkpoints. > https://travis-ci.org/tillrohrmann/flink/jobs/316300683 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8264) Add Scala to the parent-first loading patterns
[ https://issues.apache.org/jira/browse/FLINK-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16290810#comment-16290810 ] ASF GitHub Bot commented on FLINK-8264: --- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5167 [FLINK-8264] [core] Add 'scala.' to the 'parent-first' classloading patterns **BACKPORT of #5166 to release-1.4** ## What is the purpose of the change Adding `scala.` to the "parent-first-patterns" makes sure that Scala classes are not duplicated through "child-first" classloading when users accidentally package the Scala Library into the application jar. Since Scala classes traverse the boundary between core and user space, they should never be duplicated. ## Brief change log - Adds `scala.` to the default value of `classloader.parent-first-patterns`. ## Verifying this change This change can be verified as follows: - Create a very simple quickstart Scala project using a Scala lambda for a filter function (`_ => true`). - Package it such that the Scala library is in the user code jar - Without the fix, you get a weird class cast exception during deserialization, with this fix, everything is fine. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink 8264_backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5167.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5167 commit 8bd70c1e3d31f5c285ac5995504e52e39063e90b Author: Stephan EwenDate: 2017-12-14T12:50:39Z [FLINK-8264] [core] Add 'scala.' to the 'parent-first' classloading patterns. > Add Scala to the parent-first loading patterns > -- > > Key: FLINK-8264 > URL: https://issues.apache.org/jira/browse/FLINK-8264 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.4.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.5.0, 1.4.1 > > > A confusing experience happens when users accidentally package the Scala > Library into their jar file. The reversed class loading duplicates Scala's > classes, leading to exceptions like the one below. > By adding {{scala.}} to the default 'parent-first-patterns' we can improve > the user experience in such situations. > Exception Stack Trace: > {code} > java.lang.ClassCastException: cannot assign instance of > org.peopleinmotion.TestFunction$$anonfun$1 to field > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type > scala.Function1 in instance of > org.apache.flink.streaming.api.scala.DataStream$$anon$7 > at > java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) > at > java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at >
[GitHub] flink pull request #5167: [FLINK-8264] [core] Add 'scala.' to the 'parent-fi...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5167 [FLINK-8264] [core] Add 'scala.' to the 'parent-first' classloading patterns **BACKPORT of #5166 to release-1.4** ## What is the purpose of the change Adding `scala.` to the "parent-first-patterns" makes sure that Scala classes are not duplicated through "child-first" classloading when users accidentally package the Scala Library into the application jar. Since Scala classes traverse the boundary between core and user space, they should never be duplicated. ## Brief change log - Adds `scala.` to the default value of `classloader.parent-first-patterns`. ## Verifying this change This change can be verified as follows: - Create a very simple quickstart Scala project using a Scala lambda for a filter function (`_ => true`). - Package it such that the Scala library is in the user code jar - Without the fix, you get a weird class cast exception during deserialization, with this fix, everything is fine. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no)** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink 8264_backport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5167.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5167 commit 8bd70c1e3d31f5c285ac5995504e52e39063e90b Author: Stephan EwenDate: 2017-12-14T12:50:39Z [FLINK-8264] [core] Add 'scala.' to the 'parent-first' classloading patterns. ---