[GitHub] [flink] lometheus commented on pull request #14673: [FLINK-20963][python] Update example with the latest recommended API
lometheus commented on pull request #14673: URL: https://github.com/apache/flink/pull/14673#issuecomment-762058823 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-21006) HBaseTablePlanTest tests failed in haoop 3.1.3 with "java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Objec
[ https://issues.apache.org/jira/browse/FLINK-21006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-21006: --- Assignee: WeiNan Zhao > HBaseTablePlanTest tests failed in haoop 3.1.3 with > "java.lang.NoSuchMethodError: > com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V" > - > > Key: FLINK-21006 > URL: https://issues.apache.org/jira/browse/FLINK-21006 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase, Table SQL / Ecosystem >Affects Versions: 1.13.0 >Reporter: Huang Xingbo >Assignee: WeiNan Zhao >Priority: Critical > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12159=logs=ba53eb01-1462-56a3-8e98-0dd97fbcaab5=bfbc6239-57a0-5db0-63f3-41551b4f7d51] > {code:java} > 2021-01-15T22:48:58.1843544Z Caused by: java.lang.NoSuchMethodError: > com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V > 2021-01-15T22:48:58.1844358Z at > org.apache.hadoop.conf.Configuration.set(Configuration.java:1357) > 2021-01-15T22:48:58.1845035Z at > org.apache.hadoop.conf.Configuration.set(Configuration.java:1338) > 2021-01-15T22:48:58.1845805Z at > org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseConfiguration(HBaseOptions.java:157) > 2021-01-15T22:48:58.1846960Z at > org.apache.flink.connector.hbase1.HBase1DynamicTableFactory.createDynamicTableSource(HBase1DynamicTableFactory.java:73) > 2021-01-15T22:48:58.1848020Z at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119) > 2021-01-15T22:48:58.1848574Z ... 49 more > {code} > The exception seems that the different version of guava caused. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-21007) flink sql sink 支持更新目标表的部分字段以及提供null值处理选项
[ https://issues.apache.org/jira/browse/FLINK-21007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-21007. Resolution: Invalid Please use English in JIRA. If you intend to ask a question in Chinese you can use the user...@flink.apache.org mailing [list|https://flink.apache.org/community.html. > flink sql sink 支持更新目标表的部分字段以及提供null值处理选项 > > > Key: FLINK-21007 > URL: https://issues.apache.org/jira/browse/FLINK-21007 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.12.2 >Reporter: sky >Priority: Blocker > Fix For: 1.13.0 > > > 在通过flink SQL作为etl使用的时候,需要flink sql sink > 支持更新目标表的部分字段以及提供null值处理选项(1、当前record剔除null值的字段,只更新没有null的字段到目标表,2、即使结果中有null的字段依旧更新到目标表)。 > eg: > 目标表 > -- > id | name | age | level| > -- > 1 | sky1 | 30 | 1 | > -- > 3 | sky3 | 30 | 2 | > -- > 计算完写入存储的records(比目标表少一列): > {"id":1,level:2} > {"id":2,level:1} > 计算结果只想根据id更新level字段,不能用null覆盖name和age字段,并且需要考虑id=2的数据行不存在的可能。 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-21007) flink sql sink 支持更新目标表的部分字段以及提供null值处理选项
[ https://issues.apache.org/jira/browse/FLINK-21007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267056#comment-17267056 ] Dawid Wysakowicz edited comment on FLINK-21007 at 1/18/21, 7:53 AM: Please use English in JIRA. If you intend to ask a question in Chinese you can use the user...@flink.apache.org mailing [list|https://flink.apache.org/community.html]. was (Author: dawidwys): Please use English in JIRA. If you intend to ask a question in Chinese you can use the user...@flink.apache.org mailing [list|https://flink.apache.org/community.html. > flink sql sink 支持更新目标表的部分字段以及提供null值处理选项 > > > Key: FLINK-21007 > URL: https://issues.apache.org/jira/browse/FLINK-21007 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.12.2 >Reporter: sky >Priority: Blocker > Fix For: 1.13.0 > > > 在通过flink SQL作为etl使用的时候,需要flink sql sink > 支持更新目标表的部分字段以及提供null值处理选项(1、当前record剔除null值的字段,只更新没有null的字段到目标表,2、即使结果中有null的字段依旧更新到目标表)。 > eg: > 目标表 > -- > id | name | age | level| > -- > 1 | sky1 | 30 | 1 | > -- > 3 | sky3 | 30 | 2 | > -- > 计算完写入存储的records(比目标表少一列): > {"id":1,level:2} > {"id":2,level:1} > 计算结果只想根据id更新level字段,不能用null覆盖name和age字段,并且需要考虑id=2的数据行不存在的可能。 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14644: [FLINK-20840][table-planner] Properly transpose projection to join
flinkbot edited a comment on pull request #14644: URL: https://github.com/apache/flink/pull/14644#issuecomment-760117761 ## CI report: * 4e70987ec082764822d1eefa8356454ea6a274b9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12119) * 4a600a5d2f5abf6bf6df8692aa8dfb1d55c6f1bf UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] rmetzger commented on a change in pull request #14678: [FLINK-20833][runtime] Add pluggable failure listener in job manager
rmetzger commented on a change in pull request #14678: URL: https://github.com/apache/flink/pull/14678#discussion_r559354094 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java ## @@ -98,11 +103,22 @@ public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable caus true); } +/** @param failureListener the failure listener to be registered */ +public void registerFailureListener(FailureListener failureListener) { +if (!failureListeners.contains(failureListener)) { +failureListeners.add(failureListener); Review comment: Isn't HashSet.add() only adding something, if it isn't present already? ## File path: flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.executiongraph.FailureListener ## @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.runtime.executiongraph.DefaultFailureListener Review comment: See comment above. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java ## @@ -49,6 +51,8 @@ /** Number of all restarts happened since this job is submitted. */ private long numberOfRestarts; +private Set failureListeners; Review comment: ```suggestion private final Set failureListeners; ``` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -172,6 +176,14 @@ .createInstance(new DefaultExecutionSlotAllocationContext()); this.verticesWaitingForRestart = new HashSet<>(); + +List listeners = + failureListenerFactory.createFailureListener(jobManagerJobMetricGroup); + +for (FailureListener listener : listeners) { +executionFailureHandler.registerFailureListener(listener); +} Review comment: Since this loop executes code not controlled by the framework, I would recommend catching Throwables and returning them as an unrecoverable FailureHandlingResult. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/FailureListenerFactory.java ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceLoader; + +/** Factory class for creating {@link FailureListener} with plugin Manager. */ +public class FailureListenerFactory { +private PluginManager pluginManager; + +public FailureListenerFactory(Configuration configuration) { +this.pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); +} + +public List createFailureListener(JobManagerJobMetricGroup metricGroup) { Review comment: I wonder if we can't do the discovery and initialization of the implementations in the constructor? The available implementations won't change
[jira] [Commented] (FLINK-20812) Support 'properties.*' option to pass through all the HBase properties
[ https://issues.apache.org/jira/browse/FLINK-20812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267054#comment-17267054 ] Huang Xingbo commented on FLINK-20812: -- It seems that this PR caused the tests failed in hadoop 3.1.3. I have created the issue https://issues.apache.org/jira/browse/FLINK-21006 > Support 'properties.*' option to pass through all the HBase properties > -- > > Key: FLINK-20812 > URL: https://issues.apache.org/jira/browse/FLINK-20812 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase, Table SQL / Ecosystem >Reporter: WeiNan Zhao >Assignee: WeiNan Zhao >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > When I use the cdh cluster, I need to use kerberos authentication, and I need > to add some kerberos authentication parameters of hbase, but the current > hbase connector structure does not provide this entry, I wonder if it can be > modified, if possible, I can submit for hbase connector a pr. > e.g hbase parameter > hbase.security.authentication='kerberos', > hbase.master.kerberos.principal='...', > hbase.kerberos.regionserver.principal='...', > hbase.security.auth.enable = 'true', > hbase.sasl.clientconfig = 'Client' -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21006) HBaseTablePlanTest tests failed in haoop 3.1.3 with "java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object
Huang Xingbo created FLINK-21006: Summary: HBaseTablePlanTest tests failed in haoop 3.1.3 with "java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V" Key: FLINK-21006 URL: https://issues.apache.org/jira/browse/FLINK-21006 Project: Flink Issue Type: Bug Components: Connectors / HBase, Table SQL / Ecosystem Affects Versions: 1.13.0 Reporter: Huang Xingbo [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12159=logs=ba53eb01-1462-56a3-8e98-0dd97fbcaab5=bfbc6239-57a0-5db0-63f3-41551b4f7d51] {code:java} 2021-01-15T22:48:58.1843544Z Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V 2021-01-15T22:48:58.1844358Zat org.apache.hadoop.conf.Configuration.set(Configuration.java:1357) 2021-01-15T22:48:58.1845035Zat org.apache.hadoop.conf.Configuration.set(Configuration.java:1338) 2021-01-15T22:48:58.1845805Zat org.apache.flink.connector.hbase.options.HBaseOptions.getHBaseConfiguration(HBaseOptions.java:157) 2021-01-15T22:48:58.1846960Zat org.apache.flink.connector.hbase1.HBase1DynamicTableFactory.createDynamicTableSource(HBase1DynamicTableFactory.java:73) 2021-01-15T22:48:58.1848020Zat org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119) 2021-01-15T22:48:58.1848574Z... 49 more {code} The exception seems that the different version of guava caused. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21007) flink sql sink 支持更新目标表的部分字段以及提供null值处理选项
sky created FLINK-21007: --- Summary: flink sql sink 支持更新目标表的部分字段以及提供null值处理选项 Key: FLINK-21007 URL: https://issues.apache.org/jira/browse/FLINK-21007 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: 1.12.2 Reporter: sky Fix For: 1.13.0 在通过flink SQL作为etl使用的时候,需要flink sql sink 支持更新目标表的部分字段以及提供null值处理选项(1、当前record剔除null值的字段,只更新没有null的字段到目标表,2、即使结果中有null的字段依旧更新到目标表)。 eg: 目标表 -- id | name | age | level| -- 1 | sky1 | 30 | 1 | -- 3 | sky3 | 30 | 2 | -- 计算完写入存储的records(比目标表少一列): {"id":1,level:2} {"id":2,level:1} 计算结果只想根据id更新level字段,不能用null覆盖name和age字段,并且需要考虑id=2的数据行不存在的可能。 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] leonardBang commented on a change in pull request #14644: [FLINK-20840][table-planner] Properly transpose projection to join
leonardBang commented on a change in pull request #14644: URL: https://github.com/apache/flink/pull/14644#discussion_r559362690 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java ## @@ -83,8 +83,8 @@ @Internal public class PlannerContext { -private final RelDataTypeSystem typeSystem = new FlinkTypeSystem(); -private final FlinkTypeFactory typeFactory = new FlinkTypeFactory(typeSystem); +private final RelDataTypeSystem typeSystem; Review comment: Oops,they should only appear in my another developing branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin
flinkbot edited a comment on pull request #14663: URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392 ## CI report: * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) * 606f33587dd148ad9e2dc4c9fb91986e4c995bc3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on pull request #14675: [FLINK-18383][docs]translate "JDBC SQL Connector" documentation into …
leonardBang commented on pull request #14675: URL: https://github.com/apache/flink/pull/14675#issuecomment-762042289 I'd like to take a look this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin
flinkbot edited a comment on pull request #14663: URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392 ## CI report: * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) * 606f33587dd148ad9e2dc4c9fb91986e4c995bc3 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on a change in pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin
godfreyhe commented on a change in pull request #14663: URL: https://github.com/apache/flink/pull/14663#discussion_r559352791 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java ## @@ -58,4 +71,50 @@ public FieldRefLookupKey(int index) { private LookupJoinUtil() { // no instantiation } + +/** Gets LookupFunction from temporal table according to the given lookup keys. */ +public static UserDefinedFunction getLookupFunction( +RelOptTable temporalTable, Collection lookupKeys) { + +List lookupKeyIndicesInOrder = new ArrayList<>(lookupKeys); +lookupKeyIndicesInOrder.sort(Integer::compareTo); Review comment: nit: `CommonExecLookupJoin#getOrderedLookupKeys` can be moved into `LookupJoinUtil` and then this method can reuse it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-21005) Introduce new provider for unified Sink API and implement in planner
Jark Wu created FLINK-21005: --- Summary: Introduce new provider for unified Sink API and implement in planner Key: FLINK-21005 URL: https://issues.apache.org/jira/browse/FLINK-21005 Project: Flink Issue Type: Sub-task Components: Table SQL / API, Table SQL / Planner Reporter: Jark Wu Fix For: 1.13.0 FLIP-143 [1] introduced the unified sink API, we should add a {{SinkRuntimeProvider}} for it and support it in planner. So that Table SQL users can also use the unified sink APIs. [1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14665: [FLINK-20858][python][table-planner-blink] Port StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to Java
flinkbot edited a comment on pull request #14665: URL: https://github.com/apache/flink/pull/14665#issuecomment-760792355 ## CI report: * b1df7f6879a63390c8f252db85f8f338a4f29427 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12113) * 23dd7aa78998633facc8b4282443fa4a7816613f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12167) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14665: [FLINK-20858][python][table-planner-blink] Port StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to Java
flinkbot edited a comment on pull request #14665: URL: https://github.com/apache/flink/pull/14665#issuecomment-760792355 ## CI report: * b1df7f6879a63390c8f252db85f8f338a4f29427 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12113) * 23dd7aa78998633facc8b4282443fa4a7816613f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-21004) test_process_mode_boot.py test hangs
[ https://issues.apache.org/jira/browse/FLINK-21004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Huang Xingbo updated FLINK-21004: - Summary: test_process_mode_boot.py test hangs (was: test_process_mode_boot.py tes hangs) > test_process_mode_boot.py test hangs > > > Key: FLINK-21004 > URL: https://issues.apache.org/jira/browse/FLINK-21004 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Huang Xingbo >Priority: Major > Labels: test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12159=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490] > {code:java} > 2021-01-18T00:55:48.3027307Z > pyflink/fn_execution/tests/test_process_mode_boot.py > == > 2021-01-18T00:55:48.3028351Z Process produced no output for 900 seconds. > 2021-01-18T00:55:48.3033084Z > == > 2021-01-18T00:55:48.3033633Z > == > 2021-01-18T00:55:48.3034073Z The following Java processes are running (JPS) > 2021-01-18T00:55:48.3037261Z > == > 2021-01-18T00:55:48.3180991Z Picked up JAVA_TOOL_OPTIONS: > -XX:+HeapDumpOnOutOfMemoryError > 2021-01-18T00:55:48.4930672Z 18493 Jps > 2021-01-18T00:55:48.4931189Z 12477 PythonGatewayServer > 2021-01-18T00:55:48.4979543Z > == > 2021-01-18T00:55:48.4984759Z Printing stack trace of Java process 18493 > 2021-01-18T00:55:48.4987182Z > == > 2021-01-18T00:55:48.5025804Z Picked up JAVA_TOOL_OPTIONS: > -XX:+HeapDumpOnOutOfMemoryError > 2021-01-18T00:55:48.5943552Z 18493: No such process > 2021-01-18T00:55:48.6089460Z > == > 2021-01-18T00:55:48.6089977Z Printing stack trace of Java process 12477 > 2021-01-18T00:55:48.6094322Z > == > 2021-01-18T00:55:48.6140780Z Picked up JAVA_TOOL_OPTIONS: > -XX:+HeapDumpOnOutOfMemoryError > 2021-01-18T00:55:48.9394259Z 2021-01-18 00:55:48 > 2021-01-18T00:55:48.9401959Z Full thread dump OpenJDK 64-Bit Server VM > (25.275-b01 mixed mode): > 2021-01-18T00:55:48.9402608Z > 2021-01-18T00:55:48.9403205Z "Attach Listener" #3327 daemon prio=9 os_prio=0 > tid=0x7f4d9c02c800 nid=0x4864 waiting on condition [0x] > 2021-01-18T00:55:48.9403817Zjava.lang.Thread.State: RUNNABLE > 2021-01-18T00:55:48.9404137Z > 2021-01-18T00:55:48.9404634Z "process reaper" #2273 daemon prio=10 os_prio=0 > tid=0x7f4db809c000 nid=0x3ff9 runnable [0x7f4d7b07] > 2021-01-18T00:55:48.9405191Zjava.lang.Thread.State: RUNNABLE > 2021-01-18T00:55:48.9405785Z at > java.lang.UNIXProcess.waitForProcessExit(Native Method) > 2021-01-18T00:55:48.9425226Z at > java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) > 2021-01-18T00:55:48.9431982Z at > java.lang.UNIXProcess$$Lambda$1358/33063055.run(Unknown Source) > 2021-01-18T00:55:48.9432532Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2021-01-18T00:55:48.9439963Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2021-01-18T00:55:48.9444803Z at java.lang.Thread.run(Thread.java:748) > 2021-01-18T00:55:48.9445051Z > 2021-01-18T00:55:48.9452857Z "process reaper" #2033 daemon prio=10 os_prio=0 > tid=0x7f4db8086800 nid=0x3e2a runnable [0x7f4d7b0e2000] > 2021-01-18T00:55:48.9453377Zjava.lang.Thread.State: RUNNABLE > 2021-01-18T00:55:48.9453743Z at > java.lang.UNIXProcess.waitForProcessExit(Native Method) > 2021-01-18T00:55:48.9454188Z at > java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) > 2021-01-18T00:55:48.9454645Z at > java.lang.UNIXProcess$$Lambda$1358/33063055.run(Unknown Source) > 2021-01-18T00:55:48.9455136Z at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > 2021-01-18T00:55:48.9455669Z at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > 2021-01-18T00:55:48.9456119Z at java.lang.Thread.run(Thread.java:748) > 2021-01-18T00:55:48.9456355Z > 2021-01-18T00:55:48.9456708Z "process reaper" #1923 daemon prio=10 os_prio=0 > tid=0x7f4d9c005800 nid=0x3cef runnable [0x7f4d7808b000] > 2021-01-18T00:55:48.9457148Zjava.lang.Thread.State: RUNNABLE > 2021-01-18T00:55:48.9457484Z at >
[jira] [Created] (FLINK-21004) test_process_mode_boot.py tes hangs
Huang Xingbo created FLINK-21004: Summary: test_process_mode_boot.py tes hangs Key: FLINK-21004 URL: https://issues.apache.org/jira/browse/FLINK-21004 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.13.0 Reporter: Huang Xingbo [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12159=logs=821b528f-1eed-5598-a3b4-7f748b13f261=4fad9527-b9a5-5015-1b70-8356e5c91490] {code:java} 2021-01-18T00:55:48.3027307Z pyflink/fn_execution/tests/test_process_mode_boot.py == 2021-01-18T00:55:48.3028351Z Process produced no output for 900 seconds. 2021-01-18T00:55:48.3033084Z == 2021-01-18T00:55:48.3033633Z == 2021-01-18T00:55:48.3034073Z The following Java processes are running (JPS) 2021-01-18T00:55:48.3037261Z == 2021-01-18T00:55:48.3180991Z Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError 2021-01-18T00:55:48.4930672Z 18493 Jps 2021-01-18T00:55:48.4931189Z 12477 PythonGatewayServer 2021-01-18T00:55:48.4979543Z == 2021-01-18T00:55:48.4984759Z Printing stack trace of Java process 18493 2021-01-18T00:55:48.4987182Z == 2021-01-18T00:55:48.5025804Z Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError 2021-01-18T00:55:48.5943552Z 18493: No such process 2021-01-18T00:55:48.6089460Z == 2021-01-18T00:55:48.6089977Z Printing stack trace of Java process 12477 2021-01-18T00:55:48.6094322Z == 2021-01-18T00:55:48.6140780Z Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError 2021-01-18T00:55:48.9394259Z 2021-01-18 00:55:48 2021-01-18T00:55:48.9401959Z Full thread dump OpenJDK 64-Bit Server VM (25.275-b01 mixed mode): 2021-01-18T00:55:48.9402608Z 2021-01-18T00:55:48.9403205Z "Attach Listener" #3327 daemon prio=9 os_prio=0 tid=0x7f4d9c02c800 nid=0x4864 waiting on condition [0x] 2021-01-18T00:55:48.9403817Zjava.lang.Thread.State: RUNNABLE 2021-01-18T00:55:48.9404137Z 2021-01-18T00:55:48.9404634Z "process reaper" #2273 daemon prio=10 os_prio=0 tid=0x7f4db809c000 nid=0x3ff9 runnable [0x7f4d7b07] 2021-01-18T00:55:48.9405191Zjava.lang.Thread.State: RUNNABLE 2021-01-18T00:55:48.9405785Zat java.lang.UNIXProcess.waitForProcessExit(Native Method) 2021-01-18T00:55:48.9425226Zat java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) 2021-01-18T00:55:48.9431982Zat java.lang.UNIXProcess$$Lambda$1358/33063055.run(Unknown Source) 2021-01-18T00:55:48.9432532Zat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 2021-01-18T00:55:48.9439963Zat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 2021-01-18T00:55:48.9444803Zat java.lang.Thread.run(Thread.java:748) 2021-01-18T00:55:48.9445051Z 2021-01-18T00:55:48.9452857Z "process reaper" #2033 daemon prio=10 os_prio=0 tid=0x7f4db8086800 nid=0x3e2a runnable [0x7f4d7b0e2000] 2021-01-18T00:55:48.9453377Zjava.lang.Thread.State: RUNNABLE 2021-01-18T00:55:48.9453743Zat java.lang.UNIXProcess.waitForProcessExit(Native Method) 2021-01-18T00:55:48.9454188Zat java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) 2021-01-18T00:55:48.9454645Zat java.lang.UNIXProcess$$Lambda$1358/33063055.run(Unknown Source) 2021-01-18T00:55:48.9455136Zat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 2021-01-18T00:55:48.9455669Zat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 2021-01-18T00:55:48.9456119Zat java.lang.Thread.run(Thread.java:748) 2021-01-18T00:55:48.9456355Z 2021-01-18T00:55:48.9456708Z "process reaper" #1923 daemon prio=10 os_prio=0 tid=0x7f4d9c005800 nid=0x3cef runnable [0x7f4d7808b000] 2021-01-18T00:55:48.9457148Zjava.lang.Thread.State: RUNNABLE 2021-01-18T00:55:48.9457484Zat java.lang.UNIXProcess.waitForProcessExit(Native Method) 2021-01-18T00:55:48.9457915Zat java.lang.UNIXProcess.lambda$initStreams$3(UNIXProcess.java:289) 2021-01-18T00:55:48.9458361Zat java.lang.UNIXProcess$$Lambda$1358/33063055.run(Unknown Source) 2021-01-18T00:55:48.9458814Zat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 2021-01-18T00:55:48.9459364Zat
[GitHub] [flink] HuangXingBo commented on pull request #14665: [FLINK-20858][python][table-planner-blink] Port StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to Java
HuangXingBo commented on pull request #14665: URL: https://github.com/apache/flink/pull/14665#issuecomment-762005664 @dianfu Thanks a lot for the review. I have addressed the comments at the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HuangXingBo commented on a change in pull request #14665: [FLINK-20858][python][table-planner-blink] Port StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate
HuangXingBo commented on a change in pull request #14665: URL: https://github.com/apache/flink/pull/14665#discussion_r559334274 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.python.PythonFunctionInfo; +import org.apache.flink.table.planner.calcite.FlinkRelBuilder; +import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.expressions.PlannerProctimeAttribute; +import org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute; +import org.apache.flink.table.planner.expressions.PlannerWindowEnd; +import org.apache.flink.table.planner.expressions.PlannerWindowProperty; +import org.apache.flink.table.planner.expressions.PlannerWindowStart; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rel.core.AggregateCall; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; + +/** Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate function). */ +public class BatchExecPythonGroupWindowAggregate extends CommonExecPythonAggregate +implements BatchExecNode { + +private static final String ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = + "org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch." ++ "BatchArrowPythonGroupWindowAggregateFunctionOperator"; + +private final int[] grouping; +private final int[] groupingSet; +private final AggregateCall[] aggCalls; +private final LogicalWindow window; +private final int inputTimeFieldIndex; +private final FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties; + +public BatchExecPythonGroupWindowAggregate( +int[] grouping, +int[] groupingSet, +AggregateCall[] aggCalls, +LogicalWindow window, +int inputTimeFieldIndex, +FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties, +ExecEdge inputEdge, +RowType outputType, +String description) { +super(inputEdge, outputType, description); +this.grouping = grouping; +this.groupingSet = groupingSet; +this.aggCalls = aggCalls; +this.window = window; +this.inputTimeFieldIndex = inputTimeFieldIndex; +this.namedWindowProperties = namedWindowProperties; +} + +@Override +protected Transformation translateToPlanInternal(PlannerBase planner) { +final ExecNode inputNode = (ExecNode) getInputNodes().get(0); +final Transformation inputTransform = inputNode.translateToPlan(planner); +final RowType inputRowType = (RowType) inputNode.getOutputType(); +final RowType outputRowType =
[jira] [Assigned] (FLINK-20997) YarnTestBaseTest fails due to NPE
[ https://issues.apache.org/jira/browse/FLINK-20997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias reassigned FLINK-20997: Assignee: Paul Lin > YarnTestBaseTest fails due to NPE > - > > Key: FLINK-20997 > URL: https://issues.apache.org/jira/browse/FLINK-20997 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Tests >Affects Versions: 1.12.0, 1.11.3 >Reporter: Paul Lin >Assignee: Paul Lin >Priority: Major > Labels: pull-request-available > Attachments: 截屏2021-01-16 17.51.54.png > > > YarnTestBase depends on classpaths generated by Maven dependency plugin in > `package` phase, but YarnTestBaseTest is a unit test that executed in `test` > phase (which is before `package`), so it's unable to find `yarn.classpath` > and causes NPE. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20994) Add public method to create TableEnvironment in PyFlink
[ https://issues.apache.org/jira/browse/FLINK-20994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-20994: Fix Version/s: 1.13.0 > Add public method to create TableEnvironment in PyFlink > --- > > Key: FLINK-20994 > URL: https://issues.apache.org/jira/browse/FLINK-20994 > Project: Flink > Issue Type: Improvement > Components: API / Python >Affects Versions: 1.13.0 >Reporter: Seth Wiesman >Priority: Major > Fix For: 1.13.0 > > > The Python table Api offers three table environment: > * TableEnvironment > * StreamTableEnvironment > * BatchTableEnvironment > The `TableEnvironment` pydoc states that it a unified interface for pure > table applications. However, it is currently missing a public constructor or > `create` method to instantiate. This method should be added so users can > leverage a unified interface and to better align pyflink with the JVM table > apis. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13247) Implement external shuffle service for YARN
[ https://issues.apache.org/jira/browse/FLINK-13247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267020#comment-17267020 ] Zhijiang commented on FLINK-13247: -- AFAIK, the current assignee [~ssy] will not work on it any more since he has already transferred to a new work position. But I am not sure whether there are other candidates to take over this issue in the future plan. > Implement external shuffle service for YARN > --- > > Key: FLINK-13247 > URL: https://issues.apache.org/jira/browse/FLINK-13247 > Project: Flink > Issue Type: New Feature > Components: Runtime / Network >Reporter: MalcolmSanders >Assignee: MalcolmSanders >Priority: Minor > > Flink batch job users could achieve better cluster utilization and job > throughput throught external shuffle service because the producers of > intermedia result partitions can be released once intermedia result > partitions have been persisted on disks. In > [FLINK-10653|https://issues.apache.org/jira/browse/FLINK-10653], [~zjwang] > has introduced pluggable shuffle manager architecture which abstracts the > process of data transfer between stages from flink runtime as shuffle > service. I propose to YARN implementation for flink external shuffle service > since YARN is widely used in various companies. > The basic idea is as follows: > (1) Producers write intermedia result partitions to local disks assigned by > NodeManager; > (2) Yarn shuffle servers, deployed on each NodeManager as an auxiliary > service, are acknowledged of intermedia result partition descriptions by > producers; > (3) Consumers fetch intermedia result partition from yarn shuffle servers; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20933) Config Python Operator Use Managed Memory In Python DataStream
[ https://issues.apache.org/jira/browse/FLINK-20933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267018#comment-17267018 ] Dian Fu commented on FLINK-20933: - Fixed in release-1.12 via 78799ac2f4b12b4ce789be6ad39ef829e361e61f > Config Python Operator Use Managed Memory In Python DataStream > -- > > Key: FLINK-20933 > URL: https://issues.apache.org/jira/browse/FLINK-20933 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0, 1.13.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0, 1.12.2 > > > Now the way to set `Python DataStream Operator` to use managed memory is to > set a hook in the `execute` method of `Python StreamExecutionEnvironment` to > traverse the `StreamGraph` and set the `Python Operator` to use managed > memory. > But when the user’s job uses `from_data_stream` to convert the `DataStream` > to a `Table`, the `TableEnvironment.execute` method is used at the end rather > than `StreamExecutionEnvironment.execute`, so the `Python DataStream` related > operators will not have `Managed Memory` set. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-20933) Config Python Operator Use Managed Memory In Python DataStream
[ https://issues.apache.org/jira/browse/FLINK-20933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-20933: Fix Version/s: 1.12.2 > Config Python Operator Use Managed Memory In Python DataStream > -- > > Key: FLINK-20933 > URL: https://issues.apache.org/jira/browse/FLINK-20933 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0, 1.13.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0, 1.12.2 > > > Now the way to set `Python DataStream Operator` to use managed memory is to > set a hook in the `execute` method of `Python StreamExecutionEnvironment` to > traverse the `StreamGraph` and set the `Python Operator` to use managed > memory. > But when the user’s job uses `from_data_stream` to convert the `DataStream` > to a `Table`, the `TableEnvironment.execute` method is used at the end rather > than `StreamExecutionEnvironment.execute`, so the `Python DataStream` related > operators will not have `Managed Memory` set. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu edited a comment on pull request #14676: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream
dianfu edited a comment on pull request #14676: URL: https://github.com/apache/flink/pull/14676#issuecomment-761985379 Closed via 78799ac2f4b12b4ce789be6ad39ef829e361e61f This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu commented on pull request #14676: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream
dianfu commented on pull request #14676: URL: https://github.com/apache/flink/pull/14676#issuecomment-761985379 closed via 78799ac2f4b12b4ce789be6ad39ef829e361e61f This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu closed pull request #14676: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream
dianfu closed pull request #14676: URL: https://github.com/apache/flink/pull/14676 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-20933) Config Python Operator Use Managed Memory In Python DataStream
[ https://issues.apache.org/jira/browse/FLINK-20933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-20933. --- Fix Version/s: 1.13.0 Assignee: Huang Xingbo Resolution: Fixed Fixed in master via e1d98e2d806493ff921b2984ad2a1eb200835c04 > Config Python Operator Use Managed Memory In Python DataStream > -- > > Key: FLINK-20933 > URL: https://issues.apache.org/jira/browse/FLINK-20933 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.12.0, 1.13.0 >Reporter: Huang Xingbo >Assignee: Huang Xingbo >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > Now the way to set `Python DataStream Operator` to use managed memory is to > set a hook in the `execute` method of `Python StreamExecutionEnvironment` to > traverse the `StreamGraph` and set the `Python Operator` to use managed > memory. > But when the user’s job uses `from_data_stream` to convert the `DataStream` > to a `Table`, the `TableEnvironment.execute` method is used at the end rather > than `StreamExecutionEnvironment.execute`, so the `Python DataStream` related > operators will not have `Managed Memory` set. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu closed pull request #14621: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream
dianfu closed pull request #14621: URL: https://github.com/apache/flink/pull/14621 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-12358) Verify whether rest documenation needs to be updated when building pull request
[ https://issues.apache.org/jira/browse/FLINK-12358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17267014#comment-17267014 ] Yun Tang commented on FLINK-12358: -- [~trohrmann] I had a draft two years ago. However, codebase has changed after two years. We don't have travis nowadays and also plan to abandon Jekyll which would remove all html files. In my opinions, if we have moved to Hugo, there would be no more difference between html and markdown files. Thus, we might not need this feature as it could not help more once we move to Hugo. > Verify whether rest documenation needs to be updated when building pull > request > --- > > Key: FLINK-12358 > URL: https://issues.apache.org/jira/browse/FLINK-12358 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Minor > > Currently, unlike configuration docs, rest-API docs have no any methods to > check whether updated to latest code. This is really annoying and not easy to > track if only checked by developers. > I plan to check this in travis to verify whether any files have been updated > by using `git status`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu commented on a change in pull request #14665: [FLINK-20858][python][table-planner-blink] Port StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to Ja
dianfu commented on a change in pull request #14665: URL: https://github.com/apache/flink/pull/14665#discussion_r559287136 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.python.PythonFunctionInfo; +import org.apache.flink.table.planner.calcite.FlinkRelBuilder; +import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.expressions.PlannerProctimeAttribute; +import org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute; +import org.apache.flink.table.planner.expressions.PlannerWindowEnd; +import org.apache.flink.table.planner.expressions.PlannerWindowProperty; +import org.apache.flink.table.planner.expressions.PlannerWindowStart; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rel.core.AggregateCall; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; + +/** Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate function). */ +public class BatchExecPythonGroupWindowAggregate extends CommonExecPythonAggregate +implements BatchExecNode { + +private static final String ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = + "org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch." ++ "BatchArrowPythonGroupWindowAggregateFunctionOperator"; + +private final int[] grouping; +private final int[] groupingSet; +private final AggregateCall[] aggCalls; +private final LogicalWindow window; +private final int inputTimeFieldIndex; +private final FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties; + +public BatchExecPythonGroupWindowAggregate( +int[] grouping, +int[] groupingSet, +AggregateCall[] aggCalls, +LogicalWindow window, +int inputTimeFieldIndex, +FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties, +ExecEdge inputEdge, +RowType outputType, +String description) { +super(inputEdge, outputType, description); +this.grouping = grouping; +this.groupingSet = groupingSet; +this.aggCalls = aggCalls; +this.window = window; +this.inputTimeFieldIndex = inputTimeFieldIndex; +this.namedWindowProperties = namedWindowProperties; +} + +@Override +protected Transformation translateToPlanInternal(PlannerBase planner) { +final ExecNode inputNode = (ExecNode) getInputNodes().get(0); +final Transformation inputTransform = inputNode.translateToPlan(planner); +final RowType inputRowType = (RowType) inputNode.getOutputType(); +final RowType outputRowType =
[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin
flinkbot edited a comment on pull request #14663: URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392 ## CI report: * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on a change in pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin
godfreyhe commented on a change in pull request #14663: URL: https://github.com/apache/flink/pull/14663#discussion_r558185519 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java ## @@ -0,0 +1,445 @@ +package org.apache.flink.table.planner.plan.nodes.exec.common; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.utils.LookupJoinUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.runtime.collector.TableFunctionCollector; +import org.apache.flink.table.runtime.collector.TableFunctionResultFuture; +import org.apache.flink.table.runtime.generated.GeneratedCollector; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.runtime.generated.GeneratedResultFuture; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner; +import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner; +import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner; +import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner; +import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.sources.LookupableTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.tools.RelBuilder; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import scala.Option; + +/** + * Base {@link ExecNode} for temporal table join which shares most methods. + * + * For a lookup join query: + * + * + * SELECT T.id, T.content, D.age + * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D + * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id + * WHERE D.name LIKE 'Jack%' + * + * + * The LookupJoin physical node encapsulates the following RelNode tree: + * + * + * Join (l.name = r.name) + */ \ + * RelNode Calc (concat(name, "!") as name, name LIKE 'Jack%') + * | + *DimTable (lookup-keys: age=11, id=l.id) + * (age, id, name) + * + * + * + * lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table) + * calcOnTemporalTable: calc on
[GitHub] [flink] godfreyhe commented on a change in pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin
godfreyhe commented on a change in pull request #14663: URL: https://github.com/apache/flink/pull/14663#discussion_r558122588 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java ## @@ -0,0 +1,445 @@ +package org.apache.flink.table.planner.plan.nodes.exec.common; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.functions.AsyncTableFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.utils.LookupJoinUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.ShortcutUtils; +import org.apache.flink.table.runtime.collector.TableFunctionCollector; +import org.apache.flink.table.runtime.collector.TableFunctionResultFuture; +import org.apache.flink.table.runtime.generated.GeneratedCollector; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.runtime.generated.GeneratedResultFuture; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner; +import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner; +import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner; +import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner; +import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.sources.LookupableTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.tools.RelBuilder; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import scala.Option; + +/** + * Base {@link ExecNode} for temporal table join which shares most methods. + * + * For a lookup join query: + * + * + * SELECT T.id, T.content, D.age + * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D + * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id + * WHERE D.name LIKE 'Jack%' + * + * + * The LookupJoin physical node encapsulates the following RelNode tree: + * + * + * Join (l.name = r.name) + */ \ + * RelNode Calc (concat(name, "!") as name, name LIKE 'Jack%') + * | + *DimTable (lookup-keys: age=11, id=l.id) + * (age, id, name) + * + * + * + * lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table) + * calcOnTemporalTable: calc on
[jira] [Closed] (FLINK-20966) Rename Stream(/Batch)ExecIntermediateTableScan to Stream(/Batch)PhysicalIntermediateTableScan
[ https://issues.apache.org/jira/browse/FLINK-20966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he closed FLINK-20966. -- Resolution: Fixed Fixed in 1.13.0: 2417d2e4..b0f889a9 > Rename Stream(/Batch)ExecIntermediateTableScan to > Stream(/Batch)PhysicalIntermediateTableScan > - > > Key: FLINK-20966 > URL: https://issues.apache.org/jira/browse/FLINK-20966 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] godfreyhe closed pull request #14638: [FLINK-20966][table-planner-blink] Rename Stream(/Batch)ExecIntermediateTableScan to Stream(/Batch)PhysicalIntermediateTableScan
godfreyhe closed pull request #14638: URL: https://github.com/apache/flink/pull/14638 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on pull request #14638: [FLINK-20966][table-planner-blink] Rename Stream(/Batch)ExecIntermediateTableScan to Stream(/Batch)PhysicalIntermediateTableScan
godfreyhe commented on pull request #14638: URL: https://github.com/apache/flink/pull/14638#issuecomment-761957050 @flinkbot run azure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin
flinkbot edited a comment on pull request #14663: URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392 ## CI report: * 4224ed129067a1550f812e8803c3298d523c2495 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120) * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-20935) can't write flink configuration to tmp file and add it to local resource in yarn session mode
[ https://issues.apache.org/jira/browse/FLINK-20935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17264565#comment-17264565 ] yuemeng edited comment on FLINK-20935 at 1/18/21, 2:44 AM: --- [~trohrmann] [~fly_in_gis],yes,I agree that,I will check all the call of {{registerSingleLocalResource}} to make sure that the local path is qualified [~trohrmann] [~fly_in_gis],i had checked all the place who called {{registerSingleLocalResource method.but for now,only this place can cause the problem,because of other local file always had a file schema(file:///)}} was (Author: yuemeng): [~trohrmann] [~fly_in_gis],yes,I agree that,I will check all the call of {{registerSingleLocalResource}} to make sure that the local path is qualified > can't write flink configuration to tmp file and add it to local resource in > yarn session mode > - > > Key: FLINK-20935 > URL: https://issues.apache.org/jira/browse/FLINK-20935 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN >Affects Versions: 1.12.0, 1.13.0 >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > Labels: pull-request-available > > In flink 1.12.0 or lastest version,when we execute command such as > bin/yarn-session.sh -n 20 -jm 9096 -nm 4096 -st,the depoy will be failed with > follow errors: > {code} > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:411) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:498) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$4(FlinkYarnSessionCli.java:730) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) > at > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:730) > Caused by: java.io.FileNotFoundException: File does not exist: > /tmp/application_1573723355201_0036-flink-conf.yaml688141408443326132.tmp > at > org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1309) > {code} > when we called startAppMaster method in YarnClusterDescriptor,it will be try > to write flink configuration to tmp file and add it to local resource. but > the follow code will make the tmp file system as a distribute file system > {code} > // Upload the flink configuration > // write out configuration file > File tmpConfigurationFile = null; > try { > tmpConfigurationFile = File.createTempFile(appId + > "-flink-conf.yaml", null); > BootstrapTools.writeConfiguration(configuration, > tmpConfigurationFile); > String flinkConfigKey = "flink-conf.yaml"; > fileUploader.registerSingleLocalResource( > flinkConfigKey, > new > Path(tmpConfigurationFile.getAbsolutePath()), > "", > LocalResourceType.FILE, > true, > true); > > classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator); > } finally { > if (tmpConfigurationFile != null && > !tmpConfigurationFile.delete()) { > LOG.warn("Fail to delete temporary file {}.", > tmpConfigurationFile.toPath()); > } > } > {code} > {code} tmpConfigurationFile.getAbsolutePath() {code} method will be return a > path without file schema and the file system will be considered as a > distribute file system -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] dianfu commented on a change in pull request #14665: [FLINK-20858][python][table-planner-blink] Port StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to Ja
dianfu commented on a change in pull request #14665: URL: https://github.com/apache/flink/pull/14665#discussion_r559287136 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.python.PythonFunctionInfo; +import org.apache.flink.table.planner.calcite.FlinkRelBuilder; +import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.expressions.PlannerProctimeAttribute; +import org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute; +import org.apache.flink.table.planner.expressions.PlannerWindowEnd; +import org.apache.flink.table.planner.expressions.PlannerWindowProperty; +import org.apache.flink.table.planner.expressions.PlannerWindowStart; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rel.core.AggregateCall; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; + +/** Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate function). */ +public class BatchExecPythonGroupWindowAggregate extends CommonExecPythonAggregate +implements BatchExecNode { + +private static final String ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = + "org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch." ++ "BatchArrowPythonGroupWindowAggregateFunctionOperator"; + +private final int[] grouping; +private final int[] groupingSet; +private final AggregateCall[] aggCalls; +private final LogicalWindow window; +private final int inputTimeFieldIndex; +private final FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties; + +public BatchExecPythonGroupWindowAggregate( +int[] grouping, +int[] groupingSet, +AggregateCall[] aggCalls, +LogicalWindow window, +int inputTimeFieldIndex, +FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties, +ExecEdge inputEdge, +RowType outputType, +String description) { +super(inputEdge, outputType, description); +this.grouping = grouping; +this.groupingSet = groupingSet; +this.aggCalls = aggCalls; +this.window = window; +this.inputTimeFieldIndex = inputTimeFieldIndex; +this.namedWindowProperties = namedWindowProperties; +} + +@Override +protected Transformation translateToPlanInternal(PlannerBase planner) { +final ExecNode inputNode = (ExecNode) getInputNodes().get(0); +final Transformation inputTransform = inputNode.translateToPlan(planner); +final RowType inputRowType = (RowType) inputNode.getOutputType(); +final RowType outputRowType =
[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin
flinkbot edited a comment on pull request #14663: URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392 ## CI report: * 4224ed129067a1550f812e8803c3298d523c2495 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120) * 852af6f79074f5d25ef8194963c464feb0038787 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-19067) resource_manager and dispatcher register on different nodes in HA mode will cause FileNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266965#comment-17266965 ] JieFang.He commented on FLINK-19067: Thank you for your answers and suggestions > resource_manager and dispatcher register on different nodes in HA mode will > cause FileNotFoundException > --- > > Key: FLINK-19067 > URL: https://issues.apache.org/jira/browse/FLINK-19067 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.11.1 >Reporter: JieFang.He >Assignee: Robert Metzger >Priority: Major > Attachments: flink-jobmanager-deployer-hejiefang01.log, > flink-jobmanager-deployer-hejiefang02.log, > flink-taskmanager-deployer-hejiefang01.log, > flink-taskmanager-deployer-hejiefang02.log > > > When run examples/batch/WordCount.jar,it will fail with the exception: > {code:java} > Caused by: java.io.FileNotFoundException: > /data2/flink/storageDir/default/blob/job_d29414828f614d5466e239be4d3889ac/blob_p-a2ebe1c5aa160595f214b4bd0f39d80e42ee2e93-f458f1c12dc023e78d25f191de1d7c4b > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) > at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:143) > at > org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:105) > at > org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:87) > at > org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:501) > at > org.apache.flink.runtime.blob.BlobServerConnection.get(BlobServerConnection.java:231) > at > org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:117) > {code} > > I think the reason is that the jobFiles are upload to the dispatcher node,but > the task get jobFiles from resource_manager node. So in HA mode, it need to > ensure they are on one node > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21003) Flink add Sink to AliyunOSS doesn't work
[ https://issues.apache.org/jira/browse/FLINK-21003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangyunyun updated FLINK-21003: Description: When I add a sink to OSS, use the code below: {code:java} String path = "oss:///"; StreamingFileSink streamingFileSink = StreamingFileSink .forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(5)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(1)) .withMaxPartSize(1024 * 1024 * 10) .build() ).build(); strStream.addSink(streamingFileSink);{code} It occus an error: {code:java} Recoverable writers on Hadoop are only supported for HDF {code} Is there any mistakes I made? OR I want to use Aliyun OSS to store the stream data split to different files. The Flink official document's example is use below: {code:java} // Write to OSS bucket stream.writeAsText("oss:///") {code} How to use this to split to different files by the data's attributes? Thanks! was: When I add a sink to OSS, use the code below: {code:java} String path = "oss:///"; StreamingFileSink streamingFileSink = StreamingFileSink .forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(5)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(1)) .withMaxPartSize(1024 * 1024 * 10) .build() ).build(); strStream.addSink(streamingFileSink);{code} It occus an error: {code:java} Recoverable writers on Hadoop are only supported for HDF {code} Is there something I made a mistake? I want to use Aliyun OSS to store the stream data split to different files. The Flink official document's example is use below: {code:java} // Write to OSS bucket stream.writeAsText("oss:///") {code} How to use this to split to different files by the data's attributes? Thanks! > Flink add Sink to AliyunOSS doesn't work > > > Key: FLINK-21003 > URL: https://issues.apache.org/jira/browse/FLINK-21003 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.11.0 >Reporter: zhangyunyun >Priority: Major > > When I add a sink to OSS, use the code below: > {code:java} > String path = "oss:///"; > StreamingFileSink streamingFileSink = StreamingFileSink > .forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8")) > .withRollingPolicy( > DefaultRollingPolicy.builder() > .withRolloverInterval(TimeUnit.MINUTES.toMillis(5)) > .withInactivityInterval(TimeUnit.MINUTES.toMillis(1)) > .withMaxPartSize(1024 * 1024 * 10) > .build() > ).build(); > strStream.addSink(streamingFileSink);{code} > It occus an error: > {code:java} > Recoverable writers on Hadoop are only supported for HDF > {code} > Is there any mistakes I made? > OR > I want to use Aliyun OSS to store the stream data split to different files. > The Flink official document's example is use below: > {code:java} > // Write to OSS bucket > stream.writeAsText("oss:///") > {code} > How to use this to split to different files by the data's attributes? > > Thanks! > > > > > > > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21003) Flink add Sink to AliyunOSS doesn't work
[ https://issues.apache.org/jira/browse/FLINK-21003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangyunyun updated FLINK-21003: Description: When I add a sink to OSS, use the code below: {code:java} String path = "oss:///"; StreamingFileSink streamingFileSink = StreamingFileSink .forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(5)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(1)) .withMaxPartSize(1024 * 1024 * 10) .build() ).build(); strStream.addSink(streamingFileSink);{code} It occus an error: {code:java} Recoverable writers on Hadoop are only supported for HDF {code} Is there something I made a mistake? I want to use Aliyun OSS to store the stream data split to different files. The Flink official document's example is use below: {code:java} // Write to OSS bucket stream.writeAsText("oss:///") {code} How to use this to split to different files by the data's attributes? Thanks! was: When I add a sink to OSS, use the code below: {code:java} //代码占位符 String path = "oss:///"; StreamingFileSink streamingFileSink = StreamingFileSink .forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(5)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(1)) .withMaxPartSize(1024 * 1024 * 10) .build() ).build(); strStream.addSink(streamingFileSink);{code} It occus an error: {code:java} //代码占位符 Recoverable writers on Hadoop are only supported for HDF {code} Is there something I made a mistake? I want to use Aliyun OSS to store the stream data split to different files. The Flink official document's example is use below: {code:java} //代码占位符 // Write to OSS bucket stream.writeAsText("oss:///") {code} How to use this to split to different files by the data's attributes? Thanks! > Flink add Sink to AliyunOSS doesn't work > > > Key: FLINK-21003 > URL: https://issues.apache.org/jira/browse/FLINK-21003 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem >Affects Versions: 1.11.0 >Reporter: zhangyunyun >Priority: Major > > When I add a sink to OSS, use the code below: > > {code:java} > String path = "oss:///"; > StreamingFileSink streamingFileSink = StreamingFileSink > .forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8")) > .withRollingPolicy( > DefaultRollingPolicy.builder() > .withRolloverInterval(TimeUnit.MINUTES.toMillis(5)) > .withInactivityInterval(TimeUnit.MINUTES.toMillis(1)) > .withMaxPartSize(1024 * 1024 * 10) > .build() > ).build(); > strStream.addSink(streamingFileSink);{code} > > It occus an error: > > {code:java} > Recoverable writers on Hadoop are only supported for HDF > {code} > Is there something I made a mistake? > I want to use Aliyun OSS to store the stream data split to different files. > The Flink official document's example is use below: > {code:java} > // Write to OSS bucket > stream.writeAsText("oss:///") > {code} > How to use this to split to different files by the data's attributes? > > Thanks! > > > > > > > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21003) Flink add Sink to AliyunOSS doesn't work
zhangyunyun created FLINK-21003: --- Summary: Flink add Sink to AliyunOSS doesn't work Key: FLINK-21003 URL: https://issues.apache.org/jira/browse/FLINK-21003 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.11.0 Reporter: zhangyunyun When I add a sink to OSS, use the code below: {code:java} //代码占位符 String path = "oss:///"; StreamingFileSink streamingFileSink = StreamingFileSink .forRowFormat(new Path(path), new SimpleStringEncoder("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(5)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(1)) .withMaxPartSize(1024 * 1024 * 10) .build() ).build(); strStream.addSink(streamingFileSink);{code} It occus an error: {code:java} //代码占位符 Recoverable writers on Hadoop are only supported for HDF {code} Is there something I made a mistake? I want to use Aliyun OSS to store the stream data split to different files. The Flink official document's example is use below: {code:java} //代码占位符 // Write to OSS bucket stream.writeAsText("oss:///") {code} How to use this to split to different files by the data's attributes? Thanks! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21001) Flink job is blocked while using tableEnvironment with tableFunction and join
[ https://issues.apache.org/jira/browse/FLINK-21001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266956#comment-17266956 ] Wu commented on FLINK-21001: I start the job at 15:48. The job is blocked and do not continue to run until 19:09. The client log is in the attachment. > Flink job is blocked while using tableEnvironment with tableFunction and join > - > > Key: FLINK-21001 > URL: https://issues.apache.org/jira/browse/FLINK-21001 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 > Environment: flink-11.2 >Reporter: Wu >Priority: Major > Attachments: client_log.txt > > > The code is as follow. > {code:java} > //代码占位符 > package com.oppo.recdata.datapipe; > import com.oppo.recdata.datapipe.flink.transform.ExplodeDataTypeEnum; > import com.oppo.recdata.datapipe.flink.transform.ExplodeModify; > import com.oppo.recdata.datapipe.flink.transform.TableExplode; > import > com.oppo.recdata.datapipe.flink.transform.function.CollectMapAggregateFunction; > import org.apache.flink.table.api.*; > import static org.apache.flink.table.api.Expressions.row; > /** > * @author wujianz...@oppo.com > */ > public class BatchTable { > public static void main(String[] args) { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > ExplodeModify modify = new ExplodeModify(ExplodeDataTypeEnum.string, > null, "&"); > tableEnv.createTemporarySystemFunction("explode", new > TableExplode(modify)); > tableEnv.createFunction("collect_map", > CollectMapAggregateFunction.class); > Table table = tableEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("buuid", DataTypes.STRING()), > DataTypes.FIELD("docType", DataTypes.INT()), > DataTypes.FIELD("viewTime", DataTypes.INT()), > DataTypes.FIELD("subCategory", DataTypes.STRING()) > ), > row("John", "1", "36", "NBA") > ); > tableEnv.createTemporaryView("feeds_expose_click_profile", table); > Table add_profile = tableEnv.sqlQuery("select buuid, cast(docType as > varchar) as docType, viewTime, subCategory from feeds_expose_click_profile > where buuid is not null and docType is not null and viewTime > 0"); > tableEnv.createTemporaryView("add_profile", add_profile); > Table cate2Click = tableEnv.sqlQuery("select buuid, docType, > viewTime, cate2 from add_profile, LATERAL TABLE(explode(subCategory)) as > t(cate2) where subCategory is not null"); > tableEnv.createTemporaryView("cate2_click", cate2Click); > Table cate2_detail = tableEnv.sqlQuery("select cate2, sum(viewTime) > as viewTimeSum, buuid, docType from cate2_click GROUP BY buuid, cate2, > docType"); > tableEnv.createTemporaryView("user_cate2_detail", cate2_detail); > Table user_global_cate2 = tableEnv.sqlQuery("select > 'gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, buuid > as keyName, docType from cate2_click group by buuid, docType"); > tableEnv.createTemporaryView("user_global_cate2", user_global_cate2); > Table global_user_cate2 = tableEnv.sqlQuery("select cate2 as > fieldName, sum(viewTime) as fieldValue, 'guser_cate2_24h_click_sumtime' as > keyName, docType from cate2_click group by cate2, docType "); > tableEnv.createTemporaryView("global_user_cate2",global_user_cate2); > Table global_user_global_cate2 = tableEnv.sqlQuery("select > 'guser_gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, > 'global_feature' as keyName, docType from cate2_click group by docType"); > tableEnv.createTemporaryView("global_user_global_cate2", > global_user_global_cate2); > Table cate2_cs_detail = tableEnv.sqlQuery("select a.cate2 as > fieldName, (a.viewTimeSum + 0.2) / (b.fieldValue * c.fieldValue / > d.fieldValue + 0.2) as fieldValue, a.buuid as keyName, a.docType from > user_cate2_detail a join user_global_cate2 b on a.buuid = b.keyName and > a.docType = b.docType join global_user_cate2 c on a.cate2 = c.fieldName and > a.docType = c.docType join global_user_global_cate2 d on a.docType = > d.docType where a.viewTimeSum > 0 and b.fieldValue > 0 and c.fieldValue > 0 > and d.fieldValue > 0"); > tableEnv.createTemporaryView("cate2_cs_detail", cate2_cs_detail); > Table cate2Cs = tableEnv.sqlQuery("select 'cate2_24h_click_sumtimeds' > as fieldName, collect_map(fieldName, ROUND(fieldValue, 5)) as fieldValue, > concat(docType, '#', keyName) as keyName from cate2_cs_detail where > fieldValue < 0 or
[jira] [Commented] (FLINK-21002) Support exactly once sink for JDBC in Table SQL API
[ https://issues.apache.org/jira/browse/FLINK-21002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266955#comment-17266955 ] bruce lu commented on FLINK-21002: -- *Super! It's a great feature in Table/SQL* > Support exactly once sink for JDBC in Table SQL API > --- > > Key: FLINK-21002 > URL: https://issues.apache.org/jira/browse/FLINK-21002 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC, Table SQL / Ecosystem >Reporter: Jark Wu >Priority: Major > > FLINK-15578 implements the exactly once JDBC sink based on XA transaction. > We should also expose this feature in Table API/SQL. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21001) Flink job is blocked while using tableEnvironment with tableFunction and join
[ https://issues.apache.org/jira/browse/FLINK-21001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wu updated FLINK-21001: --- Attachment: (was: image-2021-01-18-09-49-35-431.png) > Flink job is blocked while using tableEnvironment with tableFunction and join > - > > Key: FLINK-21001 > URL: https://issues.apache.org/jira/browse/FLINK-21001 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 > Environment: flink-11.2 >Reporter: Wu >Priority: Major > Attachments: client_log.txt > > > The code is as follow. > {code:java} > //代码占位符 > package com.oppo.recdata.datapipe; > import com.oppo.recdata.datapipe.flink.transform.ExplodeDataTypeEnum; > import com.oppo.recdata.datapipe.flink.transform.ExplodeModify; > import com.oppo.recdata.datapipe.flink.transform.TableExplode; > import > com.oppo.recdata.datapipe.flink.transform.function.CollectMapAggregateFunction; > import org.apache.flink.table.api.*; > import static org.apache.flink.table.api.Expressions.row; > /** > * @author wujianz...@oppo.com > */ > public class BatchTable { > public static void main(String[] args) { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > ExplodeModify modify = new ExplodeModify(ExplodeDataTypeEnum.string, > null, "&"); > tableEnv.createTemporarySystemFunction("explode", new > TableExplode(modify)); > tableEnv.createFunction("collect_map", > CollectMapAggregateFunction.class); > Table table = tableEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("buuid", DataTypes.STRING()), > DataTypes.FIELD("docType", DataTypes.INT()), > DataTypes.FIELD("viewTime", DataTypes.INT()), > DataTypes.FIELD("subCategory", DataTypes.STRING()) > ), > row("John", "1", "36", "NBA") > ); > tableEnv.createTemporaryView("feeds_expose_click_profile", table); > Table add_profile = tableEnv.sqlQuery("select buuid, cast(docType as > varchar) as docType, viewTime, subCategory from feeds_expose_click_profile > where buuid is not null and docType is not null and viewTime > 0"); > tableEnv.createTemporaryView("add_profile", add_profile); > Table cate2Click = tableEnv.sqlQuery("select buuid, docType, > viewTime, cate2 from add_profile, LATERAL TABLE(explode(subCategory)) as > t(cate2) where subCategory is not null"); > tableEnv.createTemporaryView("cate2_click", cate2Click); > Table cate2_detail = tableEnv.sqlQuery("select cate2, sum(viewTime) > as viewTimeSum, buuid, docType from cate2_click GROUP BY buuid, cate2, > docType"); > tableEnv.createTemporaryView("user_cate2_detail", cate2_detail); > Table user_global_cate2 = tableEnv.sqlQuery("select > 'gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, buuid > as keyName, docType from cate2_click group by buuid, docType"); > tableEnv.createTemporaryView("user_global_cate2", user_global_cate2); > Table global_user_cate2 = tableEnv.sqlQuery("select cate2 as > fieldName, sum(viewTime) as fieldValue, 'guser_cate2_24h_click_sumtime' as > keyName, docType from cate2_click group by cate2, docType "); > tableEnv.createTemporaryView("global_user_cate2",global_user_cate2); > Table global_user_global_cate2 = tableEnv.sqlQuery("select > 'guser_gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, > 'global_feature' as keyName, docType from cate2_click group by docType"); > tableEnv.createTemporaryView("global_user_global_cate2", > global_user_global_cate2); > Table cate2_cs_detail = tableEnv.sqlQuery("select a.cate2 as > fieldName, (a.viewTimeSum + 0.2) / (b.fieldValue * c.fieldValue / > d.fieldValue + 0.2) as fieldValue, a.buuid as keyName, a.docType from > user_cate2_detail a join user_global_cate2 b on a.buuid = b.keyName and > a.docType = b.docType join global_user_cate2 c on a.cate2 = c.fieldName and > a.docType = c.docType join global_user_global_cate2 d on a.docType = > d.docType where a.viewTimeSum > 0 and b.fieldValue > 0 and c.fieldValue > 0 > and d.fieldValue > 0"); > tableEnv.createTemporaryView("cate2_cs_detail", cate2_cs_detail); > Table cate2Cs = tableEnv.sqlQuery("select 'cate2_24h_click_sumtimeds' > as fieldName, collect_map(fieldName, ROUND(fieldValue, 5)) as fieldValue, > concat(docType, '#', keyName) as keyName from cate2_cs_detail where > fieldValue < 0 or fieldValue >= 0 group by keyName, docType"); > cate2Cs.execute().print(); > } > } > {code} > The
[jira] [Updated] (FLINK-21001) Flink job is blocked while using tableEnvironment with tableFunction and join
[ https://issues.apache.org/jira/browse/FLINK-21001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wu updated FLINK-21001: --- Attachment: image-2021-01-18-09-49-35-431.png > Flink job is blocked while using tableEnvironment with tableFunction and join > - > > Key: FLINK-21001 > URL: https://issues.apache.org/jira/browse/FLINK-21001 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 > Environment: flink-11.2 >Reporter: Wu >Priority: Major > Attachments: client_log.txt > > > The code is as follow. > {code:java} > //代码占位符 > package com.oppo.recdata.datapipe; > import com.oppo.recdata.datapipe.flink.transform.ExplodeDataTypeEnum; > import com.oppo.recdata.datapipe.flink.transform.ExplodeModify; > import com.oppo.recdata.datapipe.flink.transform.TableExplode; > import > com.oppo.recdata.datapipe.flink.transform.function.CollectMapAggregateFunction; > import org.apache.flink.table.api.*; > import static org.apache.flink.table.api.Expressions.row; > /** > * @author wujianz...@oppo.com > */ > public class BatchTable { > public static void main(String[] args) { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > ExplodeModify modify = new ExplodeModify(ExplodeDataTypeEnum.string, > null, "&"); > tableEnv.createTemporarySystemFunction("explode", new > TableExplode(modify)); > tableEnv.createFunction("collect_map", > CollectMapAggregateFunction.class); > Table table = tableEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("buuid", DataTypes.STRING()), > DataTypes.FIELD("docType", DataTypes.INT()), > DataTypes.FIELD("viewTime", DataTypes.INT()), > DataTypes.FIELD("subCategory", DataTypes.STRING()) > ), > row("John", "1", "36", "NBA") > ); > tableEnv.createTemporaryView("feeds_expose_click_profile", table); > Table add_profile = tableEnv.sqlQuery("select buuid, cast(docType as > varchar) as docType, viewTime, subCategory from feeds_expose_click_profile > where buuid is not null and docType is not null and viewTime > 0"); > tableEnv.createTemporaryView("add_profile", add_profile); > Table cate2Click = tableEnv.sqlQuery("select buuid, docType, > viewTime, cate2 from add_profile, LATERAL TABLE(explode(subCategory)) as > t(cate2) where subCategory is not null"); > tableEnv.createTemporaryView("cate2_click", cate2Click); > Table cate2_detail = tableEnv.sqlQuery("select cate2, sum(viewTime) > as viewTimeSum, buuid, docType from cate2_click GROUP BY buuid, cate2, > docType"); > tableEnv.createTemporaryView("user_cate2_detail", cate2_detail); > Table user_global_cate2 = tableEnv.sqlQuery("select > 'gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, buuid > as keyName, docType from cate2_click group by buuid, docType"); > tableEnv.createTemporaryView("user_global_cate2", user_global_cate2); > Table global_user_cate2 = tableEnv.sqlQuery("select cate2 as > fieldName, sum(viewTime) as fieldValue, 'guser_cate2_24h_click_sumtime' as > keyName, docType from cate2_click group by cate2, docType "); > tableEnv.createTemporaryView("global_user_cate2",global_user_cate2); > Table global_user_global_cate2 = tableEnv.sqlQuery("select > 'guser_gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, > 'global_feature' as keyName, docType from cate2_click group by docType"); > tableEnv.createTemporaryView("global_user_global_cate2", > global_user_global_cate2); > Table cate2_cs_detail = tableEnv.sqlQuery("select a.cate2 as > fieldName, (a.viewTimeSum + 0.2) / (b.fieldValue * c.fieldValue / > d.fieldValue + 0.2) as fieldValue, a.buuid as keyName, a.docType from > user_cate2_detail a join user_global_cate2 b on a.buuid = b.keyName and > a.docType = b.docType join global_user_cate2 c on a.cate2 = c.fieldName and > a.docType = c.docType join global_user_global_cate2 d on a.docType = > d.docType where a.viewTimeSum > 0 and b.fieldValue > 0 and c.fieldValue > 0 > and d.fieldValue > 0"); > tableEnv.createTemporaryView("cate2_cs_detail", cate2_cs_detail); > Table cate2Cs = tableEnv.sqlQuery("select 'cate2_24h_click_sumtimeds' > as fieldName, collect_map(fieldName, ROUND(fieldValue, 5)) as fieldValue, > concat(docType, '#', keyName) as keyName from cate2_cs_detail where > fieldValue < 0 or fieldValue >= 0 group by keyName, docType"); > cate2Cs.execute().print(); > } > } > {code} > The client log is
[jira] [Updated] (FLINK-21001) Flink job is blocked while using tableEnvironment with tableFunction and join
[ https://issues.apache.org/jira/browse/FLINK-21001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wu updated FLINK-21001: --- Attachment: client_log.txt > Flink job is blocked while using tableEnvironment with tableFunction and join > - > > Key: FLINK-21001 > URL: https://issues.apache.org/jira/browse/FLINK-21001 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2 > Environment: flink-11.2 >Reporter: Wu >Priority: Major > Attachments: client_log.txt > > > The code is as follow. > {code:java} > //代码占位符 > package com.oppo.recdata.datapipe; > import com.oppo.recdata.datapipe.flink.transform.ExplodeDataTypeEnum; > import com.oppo.recdata.datapipe.flink.transform.ExplodeModify; > import com.oppo.recdata.datapipe.flink.transform.TableExplode; > import > com.oppo.recdata.datapipe.flink.transform.function.CollectMapAggregateFunction; > import org.apache.flink.table.api.*; > import static org.apache.flink.table.api.Expressions.row; > /** > * @author wujianz...@oppo.com > */ > public class BatchTable { > public static void main(String[] args) { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > ExplodeModify modify = new ExplodeModify(ExplodeDataTypeEnum.string, > null, "&"); > tableEnv.createTemporarySystemFunction("explode", new > TableExplode(modify)); > tableEnv.createFunction("collect_map", > CollectMapAggregateFunction.class); > Table table = tableEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("buuid", DataTypes.STRING()), > DataTypes.FIELD("docType", DataTypes.INT()), > DataTypes.FIELD("viewTime", DataTypes.INT()), > DataTypes.FIELD("subCategory", DataTypes.STRING()) > ), > row("John", "1", "36", "NBA") > ); > tableEnv.createTemporaryView("feeds_expose_click_profile", table); > Table add_profile = tableEnv.sqlQuery("select buuid, cast(docType as > varchar) as docType, viewTime, subCategory from feeds_expose_click_profile > where buuid is not null and docType is not null and viewTime > 0"); > tableEnv.createTemporaryView("add_profile", add_profile); > Table cate2Click = tableEnv.sqlQuery("select buuid, docType, > viewTime, cate2 from add_profile, LATERAL TABLE(explode(subCategory)) as > t(cate2) where subCategory is not null"); > tableEnv.createTemporaryView("cate2_click", cate2Click); > Table cate2_detail = tableEnv.sqlQuery("select cate2, sum(viewTime) > as viewTimeSum, buuid, docType from cate2_click GROUP BY buuid, cate2, > docType"); > tableEnv.createTemporaryView("user_cate2_detail", cate2_detail); > Table user_global_cate2 = tableEnv.sqlQuery("select > 'gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, buuid > as keyName, docType from cate2_click group by buuid, docType"); > tableEnv.createTemporaryView("user_global_cate2", user_global_cate2); > Table global_user_cate2 = tableEnv.sqlQuery("select cate2 as > fieldName, sum(viewTime) as fieldValue, 'guser_cate2_24h_click_sumtime' as > keyName, docType from cate2_click group by cate2, docType "); > tableEnv.createTemporaryView("global_user_cate2",global_user_cate2); > Table global_user_global_cate2 = tableEnv.sqlQuery("select > 'guser_gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, > 'global_feature' as keyName, docType from cate2_click group by docType"); > tableEnv.createTemporaryView("global_user_global_cate2", > global_user_global_cate2); > Table cate2_cs_detail = tableEnv.sqlQuery("select a.cate2 as > fieldName, (a.viewTimeSum + 0.2) / (b.fieldValue * c.fieldValue / > d.fieldValue + 0.2) as fieldValue, a.buuid as keyName, a.docType from > user_cate2_detail a join user_global_cate2 b on a.buuid = b.keyName and > a.docType = b.docType join global_user_cate2 c on a.cate2 = c.fieldName and > a.docType = c.docType join global_user_global_cate2 d on a.docType = > d.docType where a.viewTimeSum > 0 and b.fieldValue > 0 and c.fieldValue > 0 > and d.fieldValue > 0"); > tableEnv.createTemporaryView("cate2_cs_detail", cate2_cs_detail); > Table cate2Cs = tableEnv.sqlQuery("select 'cate2_24h_click_sumtimeds' > as fieldName, collect_map(fieldName, ROUND(fieldValue, 5)) as fieldValue, > concat(docType, '#', keyName) as keyName from cate2_cs_detail where > fieldValue < 0 or fieldValue >= 0 group by keyName, docType"); > cate2Cs.execute().print(); > } > } > {code} > The client log is as follow. >
[jira] [Commented] (FLINK-21002) Support exactly once sink for JDBC in Table SQL API
[ https://issues.apache.org/jira/browse/FLINK-21002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266954#comment-17266954 ] Leonard Xu commented on FLINK-21002: +1 to support this feature in Table/SQL > Support exactly once sink for JDBC in Table SQL API > --- > > Key: FLINK-21002 > URL: https://issues.apache.org/jira/browse/FLINK-21002 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC, Table SQL / Ecosystem >Reporter: Jark Wu >Priority: Major > > FLINK-15578 implements the exactly once JDBC sink based on XA transaction. > We should also expose this feature in Table API/SQL. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14678: [FLINK-20833][runtime] Add pluggable failure listener in job manager
flinkbot edited a comment on pull request #14678: URL: https://github.com/apache/flink/pull/14678#issuecomment-761905721 ## CI report: * 296a46dab704e3a3553bb7a43128a3f4774bfb77 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12162) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14678: [FLINK-20833][runtime] Add pluggable failure listener in job manager
flinkbot edited a comment on pull request #14678: URL: https://github.com/apache/flink/pull/14678#issuecomment-761905721 ## CI report: * 296a46dab704e3a3553bb7a43128a3f4774bfb77 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12162) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #14678: [FLINK-20833][runtime] Add pluggable failure listener in job manager
flinkbot commented on pull request #14678: URL: https://github.com/apache/flink/pull/14678#issuecomment-761905721 ## CI report: * 296a46dab704e3a3553bb7a43128a3f4774bfb77 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20833) Expose pluggable interface for exception analysis and metrics reporting in Execution Graph
[ https://issues.apache.org/jira/browse/FLINK-20833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266945#comment-17266945 ] Zhenqiu Huang commented on FLINK-20833: --- [~rmetzger] [[~trohrmann]] Thanks for these suggestions. 1) I think there is a numRestarts to track all of the restarts in the scheduler. If it is a session cluster, then it counts the restart of all of the jobs. To get the failure of each job, It makes sense to add DefaultFailureListenr to metrics the metrics in tje job level. 2) Agree. I moved the initialization into the JobMaster. 3) It totally makes sense to encourage using to use plugin framework. I changed FailureListenrFactory to lookup FailureListener from both the resource folder and plugin manager. 4) For the documentation of the feature, I am not sure where is the right place. Would you please give some suggestions after reviewing the PR? > Expose pluggable interface for exception analysis and metrics reporting in > Execution Graph > --- > > Key: FLINK-20833 > URL: https://issues.apache.org/jira/browse/FLINK-20833 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > > For platform users of Apache flink, people usually want to classify the > failure reason( for example user code, networking, dependencies and etc) for > Flink jobs and emit metrics for those analyzed results. So that platform can > provide an accurate value for system reliability by distinguishing the > failure due to user logic from the system issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #14678: [FLINK-20833][runtime] Add pluggable failure listener in job manager
flinkbot commented on pull request #14678: URL: https://github.com/apache/flink/pull/14678#issuecomment-761904406 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 296a46dab704e3a3553bb7a43128a3f4774bfb77 (Mon Jan 18 00:02:10 UTC 2021) **Warnings:** * **1 pom.xml files were touched**: Check for build and licensing issues. * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20833) Expose pluggable interface for exception analysis and metrics reporting in Execution Graph
[ https://issues.apache.org/jira/browse/FLINK-20833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-20833: --- Labels: pull-request-available (was: ) > Expose pluggable interface for exception analysis and metrics reporting in > Execution Graph > --- > > Key: FLINK-20833 > URL: https://issues.apache.org/jira/browse/FLINK-20833 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.12.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > > For platform users of Apache flink, people usually want to classify the > failure reason( for example user code, networking, dependencies and etc) for > Flink jobs and emit metrics for those analyzed results. So that platform can > provide an accurate value for system reliability by distinguishing the > failure due to user logic from the system issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] HuangZhenQiu opened a new pull request #14678: [FLINK-20833][runtime] Add pluggable failure listener in job manager
HuangZhenQiu opened a new pull request #14678: URL: https://github.com/apache/flink/pull/14678 ## What is the purpose of the change Add pluggable failure listener spi interface so that Flink users can customize the failure handling business logic by using the plugin framework. For example, the user can use the listener to emit metrics for a different type of error (application or platform). ## Brief change log - Add FailureListener as SPI and DefaultFailureListener - Add FailureListenerFactory for loading listener from job resources or plugin manager. - Add the test cases for loading in both SPI and the pugin framework. ## Verifying this change This change added tests and can be verified as follows: - Added integration tests in flink-tests for testing FailureListenerFactory plugin loading of customized failure listeners. - Added unit test for FailureListenerFactory ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol edited a comment on pull request #14630: [FLINK-20915][docker] Move docker entrypoint to distribution
zentol edited a comment on pull request #14630: URL: https://github.com/apache/flink/pull/14630#issuecomment-761888056 > if someone downloads the binary distribution of Flink and tries to run it on a VM, are they expected to set an appropriate [FLINK_]CLASSPATH variable? To answer your example: No, it is not required. As for why the kubernetes mode does it: It assembles a plain `java ...` command, and uses `${FLINK_CLASSPATH}` as a placeholder for the jvm classpath parameter (because the actual value is determined within the container based on it's contents, not on the client-side), which is populated by the script. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol edited a comment on pull request #14630: [FLINK-20915][docker] Move docker entrypoint to distribution
zentol edited a comment on pull request #14630: URL: https://github.com/apache/flink/pull/14630#issuecomment-761888056 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol edited a comment on pull request #14630: [FLINK-20915][docker] Move docker entrypoint to distribution
zentol edited a comment on pull request #14630: URL: https://github.com/apache/flink/pull/14630#issuecomment-761888056 > if someone downloads the binary distribution of Flink and tries to run it on a VM, are they expected to set an appropriate [FLINK_]CLASSPATH variable? To answer your example: No, it is generally not required. They may though, if they want it to contain something specific that would not be put onto classpath by the distribution on it's own. An example might be jars that are put into a mounted directory. As for why the kubernetes mode does it: It assembles a plain `java ...` command, and uses `${FLINK_CLASSPATH}` as a placeholder for the jvm classpath parameter (because the actual value is determined within the container, not on the client-side), which is populated by the script. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #14630: [FLINK-20915][docker] Move docker entrypoint to distribution
zentol commented on pull request #14630: URL: https://github.com/apache/flink/pull/14630#issuecomment-761888056 > if someone downloads the binary distribution of Flink and tries to run it on a VM, are they expected to set an appropriate [FLINK_]CLASSPATH variable? To answer your example: No, it is generally not required. They may though, if they want it to contain something specific that would not be put onto classpath by the distribution on it's own. An example might be jars that are put into a mounted directory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #14630: [FLINK-20915][docker] Move docker entrypoint to distribution
zentol commented on pull request #14630: URL: https://github.com/apache/flink/pull/14630#issuecomment-761885131 > I'm confused Well that makes 2 of us. https://github.com/docker-library/official-images/pull/9249#issuecomment-756731803 > If I understand you correctly, then it would be fine for us to move **_everything in the docker-entrypoint.sh_** into a **_new scripts_** within distribution, and **_just call that script_** from docker-entrypoint.sh with all passed arguments. https://github.com/docker-library/official-images/pull/9249#issuecomment-756902869 > Yes, I'd love to see this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14499: [FLINK-15156] Warn user if System.exit() is called in user code
flinkbot edited a comment on pull request #14499: URL: https://github.com/apache/flink/pull/14499#issuecomment-751477248 ## CI report: * 7c7133b540da13a268fddabd97cc873509210f7a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12157) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14672: [FLINK-20998][docs] - flink-raw.jar no longer mentioned + prettier maven xml dependencies
flinkbot edited a comment on pull request #14672: URL: https://github.com/apache/flink/pull/14672#issuecomment-761593893 ## CI report: * f1733387653ff03bd40a52b6fa8856027024fa1b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12158) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14672: [FLINK-20998][docs] - flink-raw.jar no longer mentioned + prettier maven xml dependencies
flinkbot edited a comment on pull request #14672: URL: https://github.com/apache/flink/pull/14672#issuecomment-761593893 ## CI report: * 50f142696f4553e0bf7742ef4b46be78f57461d6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12152) * f1733387653ff03bd40a52b6fa8856027024fa1b UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sv3ndk commented on a change in pull request #14672: [FLINK-20998][docs] - flink-raw.jar no longer mentioned + prettier maven xml dependencies
sv3ndk commented on a change in pull request #14672: URL: https://github.com/apache/flink/pull/14672#discussion_r559220386 ## File path: docs/dev/table/connectors/formats/raw.zh.md ## @@ -33,13 +33,7 @@ Raw format 允许读写原始(基于字节)值作为单个列。 注意: 这种格式将 `null` 值编码成 `byte[]` 类型的 `null`。这样在 `upsert-kafka` 中使用时可能会有限制,因为 `upsert-kafka` 将 `null` 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 `null` 值,我们建议避免使用 `upsert-kafka` 连接器和 `raw` format 作为 `value.format`。 -依赖 - - -{% assign connector = site.data.sql-connectors['raw'] %} -{% include sql-connector-download-table.zh.html -connector=connector -%} +Raw format 连接器是内置的。 Review comment: > I'm thinking about adding table border to it, like following, what do you think? That's much clearer indeed, thanks for the suggestion. > we should also update the Chinese version of the sql-connnector-download-table.html. Sorry for having overlooked that one, and thanks for the commit, I copied here your update. > Cool translation ;) Thanks. I copy-pasted it from the "DataGen" connector, I don't speak Chinese myself unfortunately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sv3ndk commented on a change in pull request #14672: [FLINK-20998][docs] - flink-raw.jar no longer mentioned + prettier maven xml dependencies
sv3ndk commented on a change in pull request #14672: URL: https://github.com/apache/flink/pull/14672#discussion_r559220386 ## File path: docs/dev/table/connectors/formats/raw.zh.md ## @@ -33,13 +33,7 @@ Raw format 允许读写原始(基于字节)值作为单个列。 注意: 这种格式将 `null` 值编码成 `byte[]` 类型的 `null`。这样在 `upsert-kafka` 中使用时可能会有限制,因为 `upsert-kafka` 将 `null` 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 `null` 值,我们建议避免使用 `upsert-kafka` 连接器和 `raw` format 作为 `value.format`。 -依赖 - - -{% assign connector = site.data.sql-connectors['raw'] %} -{% include sql-connector-download-table.zh.html -connector=connector -%} +Raw format 连接器是内置的。 Review comment: > I'm thinking about adding table border to it, like following, what do you think? That's much clearer indeed, thanks for the suggestion. > we should also update the Chinese version of the sql-connnector-download-table.html. Sorry for having overlooked that one, and thanks for the commit, I copied here your update. > Cool translation ;) Thanks. I copy-pasted it from the "DataGen" connector, I don't speak Chinese myself unfortunately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14676: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream
flinkbot edited a comment on pull request #14676: URL: https://github.com/apache/flink/pull/14676#issuecomment-761815610 ## CI report: * 2bdb2217ac6e1abcf45d5b329b83b72aa54be7bb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12155) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14499: [FLINK-15156] Warn user if System.exit() is called in user code
flinkbot edited a comment on pull request #14499: URL: https://github.com/apache/flink/pull/14499#issuecomment-751477248 ## CI report: * 56c9467b56b6e6f490103a6be681921d33da Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12079) * 7c7133b540da13a268fddabd97cc873509210f7a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12157) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-21002) Support exactly once sink for JDBC in Table SQL API
[ https://issues.apache.org/jira/browse/FLINK-21002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266819#comment-17266819 ] Jark Wu commented on FLINK-21002: - We should first discuss what the new options we should introduce. > Support exactly once sink for JDBC in Table SQL API > --- > > Key: FLINK-21002 > URL: https://issues.apache.org/jira/browse/FLINK-21002 > Project: Flink > Issue Type: New Feature > Components: Connectors / JDBC, Table SQL / Ecosystem >Reporter: Jark Wu >Priority: Major > > FLINK-15578 implements the exactly once JDBC sink based on XA transaction. > We should also expose this feature in Table API/SQL. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong commented on pull request #10847: [FLINK-15578][connectors/jdbc] implement exactly once JDBC sink
wuchong commented on pull request #10847: URL: https://github.com/apache/flink/pull/10847#issuecomment-761838593 I also created FLINK-21002 to expose this feature in Table API/SQL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-21002) Support exactly once sink for JDBC in Table SQL API
Jark Wu created FLINK-21002: --- Summary: Support exactly once sink for JDBC in Table SQL API Key: FLINK-21002 URL: https://issues.apache.org/jira/browse/FLINK-21002 Project: Flink Issue Type: New Feature Components: Connectors / JDBC, Table SQL / Ecosystem Reporter: Jark Wu FLINK-15578 implements the exactly once JDBC sink based on XA transaction. We should also expose this feature in Table API/SQL. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14499: [FLINK-15156] Warn user if System.exit() is called in user code
flinkbot edited a comment on pull request #14499: URL: https://github.com/apache/flink/pull/14499#issuecomment-751477248 ## CI report: * 56c9467b56b6e6f490103a6be681921d33da Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12079) * 7c7133b540da13a268fddabd97cc873509210f7a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #10847: [FLINK-15578][connectors/jdbc] implement exactly once JDBC sink
wuchong commented on a change in pull request #10847: URL: https://github.com/apache/flink/pull/10847#discussion_r559200884 ## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/CheckpointAndXidSerializersTest.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** XaSerializersTest. */ +public class CheckpointAndXidSerializersTest { Review comment: I think a better and easier way to test serializer is extending `SerializerTestBase`. ## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/XidSerializersTest.java ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.xa; + +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import org.junit.Test; + +import javax.transaction.xa.Xid; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** XaSerializersTest. */ +public class XidSerializersTest { Review comment: I think a better and easier way to test serializer is extending `SerializerTestBase`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hwanju edited a comment on pull request #14499: [FLINK-15156] Warn user if System.exit() is called in user code
hwanju edited a comment on pull request #14499: URL: https://github.com/apache/flink/pull/14499#issuecomment-761572560 > I believe sources are relatively important, in particular because some users have their own implementations there. > @hwanju do you want to address this in this PR, or in a follow up? This is nice catch! This should be addressed as part of this PR. When this patch was written, the old Flink version did invoke `run()` in `invoke()` in the same thread, but now with mailbox approach, it spawns a separate thread for run(). Yes, `FlinkSecurityManager` supports thread-inherited monitoring flag, so why this spawned thread wasn't protected? The reason is the thread local variables are inherited at the time of thread construction, not thread run, and the construction happens in `loadAndInstantiateInvokable` before `invoke`. By wrapping `loadAndInstantiateInvokable` with exit monitoring, I see it's protected as follows: ``` org.apache.flink.runtime.UserSystemExitException: Flink user code attempted to exit JVM. at org.apache.flink.runtime.security.FlinkSecurityManager.checkExit(FlinkSecurityManager.java:184) at java.lang.Runtime.exit(Runtime.java:107) at java.lang.System.exit(System.java:971) at org.apache.flink.streaming.tests.FailureTestSource.run(FailureTestSource.java:65) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:245) ``` Added unit test (reproduced without fix, and fixed after it). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] hwanju commented on a change in pull request #14499: [FLINK-15156] Warn user if System.exit() is called in user code
hwanju commented on a change in pull request #14499: URL: https://github.com/apache/flink/pull/14499#discussion_r559200445 ## File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java ## @@ -115,4 +127,35 @@ public static boolean isDeclarativeResourceManagementEnabled(Configuration confi return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT) && !System.getProperties().containsKey("flink.tests.disable-declarative"); } + +/** The mode of how to handle user code attempting to exit JVM. */ +public enum UserSystemExitMode { +DISABLED("Flink is not monitoring or intercepting calls to System.exit()"), +LOG("Log exit attempt with stack trace but still allowing exit to be performed"), +THROW("Throw exception when exit is attempted disallowing JVM termination"); + +private final String description; + +UserSystemExitMode(String description) { +this.description = description; +} + +public static Description getConfigDescription() { +Description.DescriptionBuilder builder = Description.builder(); +List modeDescriptions = +new ArrayList<>(UserSystemExitMode.values().length); +builder.text( +"Flag to check user code exiting system by terminating JVM (e.g., System.exit())"); +for (UserSystemExitMode mode : UserSystemExitMode.values()) { +modeDescriptions.add( +text(String.format("%s - %s", mode.name(), mode.getDescription(; +} +builder.list(modeDescriptions.toArray(new TextElement[modeDescriptions.size()])); Review comment: >In your suggested documentation, I think HALT_ON_FATAL_ERROR should be INTERCEPT_USER_SYSTEM_EXIT. This part is obviously due to my misread of your comment. The new explanation seems clearer. >I'm also happy to not apply this suggestion. I believe we are optimizing the last 1% out of two rarely used expert features. Agreed but maybe adding this explanation may help. Added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] godfreyhe commented on a change in pull request #14644: [FLINK-20840][table-planner] Properly transpose projection to join
godfreyhe commented on a change in pull request #14644: URL: https://github.com/apache/flink/pull/14644#discussion_r55919 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java ## @@ -308,6 +308,14 @@ // // Other Exec Options // +@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) Review comment: why we need to introduce this ? ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala ## @@ -160,6 +160,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) case TimestampKind.ROWTIME => createRowtimeIndicatorType(true) case TimestampKind.REGULAR => createSqlType(TIMESTAMP, timestampType.getPrecision) } + case LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE => Review comment: ditto ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java ## @@ -18,41 +18,19 @@ package org.apache.flink.table.planner.functions.sql; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.planner.functions.sql.internal.SqlAuxiliaryGroupAggFunction; -import org.apache.flink.table.planner.plan.type.FlinkReturnTypes; -import org.apache.flink.table.planner.plan.type.NumericExceptFirstOperandChecker; - -import org.apache.calcite.sql.SqlAggFunction; -import org.apache.calcite.sql.SqlFunction; -import org.apache.calcite.sql.SqlFunctionCategory; -import org.apache.calcite.sql.SqlGroupedWindowFunction; -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.SqlOperatorBinding; -import org.apache.calcite.sql.SqlPostfixOperator; -import org.apache.calcite.sql.SqlSyntax; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.type.InferTypes; -import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.sql.type.ReturnTypes; -import org.apache.calcite.sql.type.SqlOperandCountRanges; -import org.apache.calcite.sql.type.SqlReturnTypeInference; -import org.apache.calcite.sql.type.SqlTypeFamily; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.type.SqlTypeTransforms; -import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable; -import org.apache.calcite.sql.validate.SqlMonotonicity; -import org.apache.calcite.sql.validate.SqlNameMatcher; -import org.apache.calcite.sql.validate.SqlNameMatchers; - -import java.util.Arrays; -import java.util.List; - -import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE; -import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.STR_MAP_NULLABLE; -import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.VARCHAR_2000_NULLABLE; +import org.apache.flink.table.planner.calcite.*; +import org.apache.flink.table.planner.functions.sql.internal.*; +import org.apache.flink.table.planner.plan.type.*; + +import org.apache.calcite.sql.*; +import org.apache.calcite.sql.fun.*; +import org.apache.calcite.sql.type.*; +import org.apache.calcite.sql.util.*; +import org.apache.calcite.sql.validate.*; + +import java.util.*; + +import static org.apache.flink.table.planner.plan.type.FlinkReturnTypes.*; Review comment: ditto ? ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java ## @@ -83,8 +83,8 @@ @Internal public class PlannerContext { -private final RelDataTypeSystem typeSystem = new FlinkTypeSystem(); -private final FlinkTypeFactory typeFactory = new FlinkTypeFactory(typeSystem); +private final RelDataTypeSystem typeSystem; Review comment: does this really need to change ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] liuyufei9527 commented on pull request #14341: [FLINK-20496][state backends] RocksDB partitioned index/filters option.
liuyufei9527 commented on pull request #14341: URL: https://github.com/apache/flink/pull/14341#issuecomment-761829624 @Myasuka [CI](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12150=results) have passed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-20989) Functions in ExplodeFunctionUtil should handle null data to avoid NPE
[ https://issues.apache.org/jira/browse/FLINK-20989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he closed FLINK-20989. -- Resolution: Fixed Fixed in 1.11.4: 0c090cdd48d2b2362b805fe40a45cdfced5f2738 > Functions in ExplodeFunctionUtil should handle null data to avoid NPE > - > > Key: FLINK-20989 > URL: https://issues.apache.org/jira/browse/FLINK-20989 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.3 >Reporter: godfrey he >Assignee: godfrey he >Priority: Major > Labels: pull-request-available > Fix For: 1.11.4 > > > The following test case will encounter NPE: > {code:scala} > val t = tEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("a", DataTypes.INT()), > DataTypes.FIELD("b", DataTypes.ARRAY(DataTypes.STRING())) > ), > row(1, Array("aa", "bb", "cc")), > row(2, null), > row(3, Array("dd")) > ) > tEnv.registerTable("T", t) > tEnv.executeSql("SELECT a, s FROM T, UNNEST(T.b) as A (s)").print() > {code} > Exception is > {code:java} > Caused by: java.lang.NullPointerException > at > scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192) > at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:192) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > org.apache.flink.table.planner.plan.utils.ObjectExplodeTableFunc.eval(ExplodeFunctionUtil.scala:34) > {code} > The reason is functions in ExplodeFunctionUtil do not handle null data. Since > 1.12, the bug is fixed, see https://issues.apache.org/jira/browse/FLINK-18528 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] godfreyhe merged pull request #14664: [FLINK-20989][table-planner-blink] Functions in ExplodeFunctionUtil should handle null data to avoid NPE
godfreyhe merged pull request #14664: URL: https://github.com/apache/flink/pull/14664 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #14675: [FLINK-18383][docs]translate "JDBC SQL Connector" documentation into …
wuchong commented on pull request #14675: URL: https://github.com/apache/flink/pull/14675#issuecomment-761827845 Hi @leonardBang , could you help to review this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #14672: [FLINK-20998][docs] - flink-raw.jar no longer mentioned + prettier maven xml dependencies
wuchong commented on a change in pull request #14672: URL: https://github.com/apache/flink/pull/14672#discussion_r559191882 ## File path: docs/dev/table/connectors/formats/raw.zh.md ## @@ -33,13 +33,7 @@ Raw format 允许读写原始(基于字节)值作为单个列。 注意: 这种格式将 `null` 值编码成 `byte[]` 类型的 `null`。这样在 `upsert-kafka` 中使用时可能会有限制,因为 `upsert-kafka` 将 `null` 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 `null` 值,我们建议避免使用 `upsert-kafka` 连接器和 `raw` format 作为 `value.format`。 -依赖 - - -{% assign connector = site.data.sql-connectors['raw'] %} -{% include sql-connector-download-table.zh.html -connector=connector -%} +Raw format 连接器是内置的。 Review comment: Cool translation ;) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20726) Introduce Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-20726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266797#comment-17266797 ] Jianyun Zhao commented on FLINK-20726: -- Thank you very much for your help! > Introduce Pulsar connector > -- > > Key: FLINK-20726 > URL: https://issues.apache.org/jira/browse/FLINK-20726 > Project: Flink > Issue Type: New Feature > Components: Connectors / Common >Affects Versions: 1.13.0 >Reporter: Jianyun Zhao >Assignee: Jianyun Zhao >Priority: Major > Fix For: 1.13.0 > > > Pulsar is an important player in messaging middleware, and it is essential > for Flink to support Pulsar. > Our existing code is maintained at > [streamnative/pulsar-flink|https://github.com/streamnative/pulsar-flink] , > next we will split it into several pr merges back to the community. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #14676: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream
flinkbot edited a comment on pull request #14676: URL: https://github.com/apache/flink/pull/14676#issuecomment-761815610 ## CI report: * 2bdb2217ac6e1abcf45d5b329b83b72aa54be7bb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12155) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pipiliang removed a comment on pull request #14677: Merge pull request #1 from apache/master
pipiliang removed a comment on pull request #14677: URL: https://github.com/apache/flink/pull/14677#issuecomment-761817426 merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pipiliang commented on pull request #14677: Merge pull request #1 from apache/master
pipiliang commented on pull request #14677: URL: https://github.com/apache/flink/pull/14677#issuecomment-761817426 merge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pipiliang closed pull request #14677: Merge pull request #1 from apache/master
pipiliang closed pull request #14677: URL: https://github.com/apache/flink/pull/14677 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #14677: Merge pull request #1 from apache/master
flinkbot commented on pull request #14677: URL: https://github.com/apache/flink/pull/14677#issuecomment-761817266 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit a6637832514fa157b121719cdb996b7dd074afab (Sun Jan 17 14:01:01 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **Invalid pull request title: No valid Jira ID provided** Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] pipiliang opened a new pull request #14677: Merge pull request #1 from apache/master
pipiliang opened a new pull request #14677: URL: https://github.com/apache/flink/pull/14677 update fork ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #14676: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream
flinkbot commented on pull request #14676: URL: https://github.com/apache/flink/pull/14676#issuecomment-761815610 ## CI report: * 2bdb2217ac6e1abcf45d5b329b83b72aa54be7bb UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14674: [FLINK-20831][table/sql runtime] Introduce the in-memory topN buffer
flinkbot edited a comment on pull request #14674: URL: https://github.com/apache/flink/pull/14674#issuecomment-761760740 ## CI report: * e30838a252ad02c36ef1a50ae161f5cc7b48cff3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12153) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #14676: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream
flinkbot commented on pull request #14676: URL: https://github.com/apache/flink/pull/14676#issuecomment-761812777 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 2bdb2217ac6e1abcf45d5b329b83b72aa54be7bb (Sun Jan 17 13:27:53 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-20933).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] HuangXingBo opened a new pull request #14676: [FLINK-20933][python] Config Python Operator Use Managed Memory In Python DataStream
HuangXingBo opened a new pull request #14676: URL: https://github.com/apache/flink/pull/14676 ## What is the purpose of the change *This pull request will Config Python Operator Use Managed Memory In Python DataStream* ## Brief change log - *Add method `setManagedMemory` to set all Python Operator to use Managed Memory* ## Verifying this change This change added tests and can be verified as follows: - *Correct original test `test_from_data_stream` to cover this feature* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #14675: [FLINK-18383][docs]translate "JDBC SQL Connector" documentation into …
flinkbot edited a comment on pull request #14675: URL: https://github.com/apache/flink/pull/14675#issuecomment-761769906 ## CI report: * 519566b1079bfca772b00c6c905cbb177594b346 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12154) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #14675: [FLINK-18383][docs]translate "JDBC SQL Connector" documentation into …
flinkbot commented on pull request #14675: URL: https://github.com/apache/flink/pull/14675#issuecomment-761769906 ## CI report: * 519566b1079bfca772b00c6c905cbb177594b346 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18383) Translate "JDBC SQL Connector" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-18383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17266739#comment-17266739 ] bruce lu commented on FLINK-18383: -- The translation job is finished! Hi, [~jark], could you help me to review the [PR|https://github.com/apache/flink/pull/14675]? > Translate "JDBC SQL Connector" page into Chinese > > > Key: FLINK-18383 > URL: https://issues.apache.org/jira/browse/FLINK-18383 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation, Table SQL / Ecosystem >Reporter: Jark Wu >Assignee: bruce lu >Priority: Major > Labels: pull-request-available > > The page url is > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connectors/jdbc.html > The markdown file is located in flink/docs/dev/table/connectors/jdbc.zh.md -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #14675: [FLINK-18383][docs]translate "JDBC SQL Connector" documentation into …
flinkbot commented on pull request #14675: URL: https://github.com/apache/flink/pull/14675#issuecomment-761767301 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 519566b1079bfca772b00c6c905cbb177594b346 (Sun Jan 17 10:26:10 UTC 2021) ✅no warnings Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org