[jira] [Assigned] (FLINK-29477) ClassCastException when collect primitive array to Python
[ https://issues.apache.org/jira/browse/FLINK-29477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingbo Huang reassigned FLINK-29477: Assignee: Juntao Hu > ClassCastException when collect primitive array to Python > - > > Key: FLINK-29477 > URL: https://issues.apache.org/jira/browse/FLINK-29477 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.15.2 >Reporter: Juntao Hu >Assignee: Juntao Hu >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.3 > > > How to reproduce this bug: > {code:java} > ds = env.from_collection([1, 2], type_info=Types.PRIMITIVE_ARRAY(Types.INT())) > ds.execute_and_collect(){code} > got: > {code:java} > java.lang.ClassCastException: class [I cannot be cast to class > [Ljava.lang.Object {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-29477) ClassCastException when collect primitive array to Python
[ https://issues.apache.org/jira/browse/FLINK-29477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingbo Huang closed FLINK-29477. Fix Version/s: 1.16.0 (was: 1.17.0) (was: 1.16.1) Resolution: Fixed Merged into master via c85e6ec45bebb2eb376a911e11294cd118893fb3 Merged into release-1.16 via b0d4dd3a1eaf648f67e0ca7cb075591ecb69e2c4 Merged into release-1.15 via 507b93eef2af79ef2ad5752e1271e5c8915bb15f > ClassCastException when collect primitive array to Python > - > > Key: FLINK-29477 > URL: https://issues.apache.org/jira/browse/FLINK-29477 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0, 1.15.2 >Reporter: Juntao Hu >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.15.3 > > > How to reproduce this bug: > {code:java} > ds = env.from_collection([1, 2], type_info=Types.PRIMITIVE_ARRAY(Types.INT())) > ds.execute_and_collect(){code} > got: > {code:java} > java.lang.ClassCastException: class [I cannot be cast to class > [Ljava.lang.Object {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] HuangXingBo closed pull request #21016: [FLINK-29477][python] Fix ClassCastException when collect primitive array
HuangXingBo closed pull request #21016: [FLINK-29477][python] Fix ClassCastException when collect primitive array URL: https://github.com/apache/flink/pull/21016 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] huyuanfeng2018 commented on pull request #21076: [FLINK-29646][sql-gateway] rest endpoint return a simpler error message
huyuanfeng2018 commented on PR #21076: URL: https://github.com/apache/flink/pull/21076#issuecomment-1279878588 > Thanks for your contribution @huyuanfeng2018. I think this pull request should be merged into master branch rather than release-1.16. Thanks @WencongLiu , I have switched to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] sap1ens commented on pull request #404: [FLINK-29633] Backport to 1.2
sap1ens commented on PR #404: URL: https://github.com/apache/flink-kubernetes-operator/pull/404#issuecomment-1279876617 Ran failed test locally, worked fine 🤷 Could be flaky -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] sap1ens commented on pull request #403: [FLINK-29633] Pass fromSavepoint argument
sap1ens commented on PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403#issuecomment-1279862016 Done https://github.com/apache/flink-kubernetes-operator/pull/404 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] sap1ens opened a new pull request, #404: [FLINK-29633] Backport to 1.2
sap1ens opened a new pull request, #404: URL: https://github.com/apache/flink-kubernetes-operator/pull/404 Backporting https://github.com/apache/flink-kubernetes-operator/pull/403 to the 1.2 release branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #403: [FLINK-29633] Pass fromSavepoint argument
gyfora commented on PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403#issuecomment-1279825176 @sap1ens could you please open a backport PR against the `release-1.2` branch based on the merged commit? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-29633) Operator doesn't pass initialSavepointPath as fromSavepoint argument
[ https://issues.apache.org/jira/browse/FLINK-29633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-29633. -- Fix Version/s: kubernetes-operator-1.3.0 Resolution: Fixed aa88e9a6738160b539a0974eae176db633a408f7 > Operator doesn't pass initialSavepointPath as fromSavepoint argument > > > Key: FLINK-29633 > URL: https://issues.apache.org/jira/browse/FLINK-29633 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.2.0 >Reporter: Yaroslav Tkachenko >Assignee: Yaroslav Tkachenko >Priority: Critical > Labels: pull-request-available > Fix For: kubernetes-operator-1.3.0 > > > The Kubernetes Operator doesn't pass *initialSavepointPath* from the JobSpec > as a *--fromSavepoint* argument to the JobManager. The operator does update > the configuration, but in the standalone mode, Flink actually [overrides > that|https://github.com/apache/flink/blob/012dc6a9b800bae0cfa5250d38de992ccbabc015/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java#L57-L63] > based on the command-line arguments. > *CmdStandaloneJobManagerDecorator* should be updated to include > *fromSavepoint.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora merged pull request #403: [FLINK-29633] Pass fromSavepoint argument
gyfora merged PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #403: [FLINK-29633] Pass fromSavepoint argument
gyfora commented on code in PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403#discussion_r996349881 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java: ## @@ -80,9 +80,14 @@ private List getApplicationClusterArgs() { } Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState(); -if (allowNonRestoredState != null) { +if (allowNonRestoredState != null && allowNonRestoredState) { args.add("--allowNonRestoredState"); -args.add(allowNonRestoredState.toString()); +} + +String savepointPath = kubernetesJobManagerParameters.getSavepointPath(); +if (savepointPath != null) { +args.add("--fromSavepoint"); +args.add(savepointPath); Review Comment: You are right, let’s keep it as is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-dynamodb] hlteoh37 commented on pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector
hlteoh37 commented on PR #1: URL: https://github.com/apache/flink-connector-dynamodb/pull/1#issuecomment-1279823905 Question for general discussion: Do we have any thoughts on using conditional expressions? I note that the `BatchWriteItem` API we are using doesn't support this. Maybe we can create a followup JIRA to track this feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] sap1ens commented on a diff in pull request #403: [FLINK-29633] Pass fromSavepoint argument
sap1ens commented on code in PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403#discussion_r996348488 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java: ## @@ -80,9 +80,14 @@ private List getApplicationClusterArgs() { } Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState(); -if (allowNonRestoredState != null) { +if (allowNonRestoredState != null && allowNonRestoredState) { args.add("--allowNonRestoredState"); -args.add(allowNonRestoredState.toString()); +} + +String savepointPath = kubernetesJobManagerParameters.getSavepointPath(); +if (savepointPath != null) { +args.add("--fromSavepoint"); +args.add(savepointPath); Review Comment: I thought about it too, but based on these lines: - https://github.com/apache/flink/blob/master/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java#L95 - https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java#L166-L168 it looks like if the flag is not present on the command-line, Flink will override the value from the config file to `false`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-dynamodb] hlteoh37 commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector
hlteoh37 commented on code in PR #1: URL: https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r996348036 ## flink-connector-aws-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilder.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodb.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig; + +import java.util.Optional; +import java.util.Properties; + +/** + * Builder to construct {@link DynamoDbSink}. + * + * The following example shows the minimum setup to create a {@link DynamoDbSink} that writes + * records into DynamoDb + * + * {@code + * private static class DummyDynamoDbRequestConverter implements DynamoDbRequestConverter { + * + * @Override + * public DynamoDbRequest apply(String s) { + * final Map item = new HashMap<>(); + * item.put("your-key", DynamoDbAttributeValue.builder().s(s).build()); + * return DynamoDbRequest.builder() + * .tableName("your-table-name") + * .putRequest(DynamoDbPutRequest.builder().item(item).build()) + * .build(); + * } + * } + * DynamoDbSink dynamoDbSink = DynamoDbSink.builder() + * .setDynamoDbRequestConverter(new DummyDynamoDbRequestConverter()) + * .build(); + * } + * + * If the following parameters are not set in this builder, the following defaults will be used: + * + * + * {@code maxBatchSize} will be 25 + * {@code maxInFlightRequests} will be 50 + * {@code maxBufferedRequests} will be 1 + * {@code maxBatchSizeInBytes} will be 16 MB i.e. {@code 16 * 1000 * 1000} + * {@code maxTimeInBufferMS} will be 5000ms + * {@code maxRecordSizeInBytes} will be 400 KB i.e. {@code 400 * 1000 * 1000} + * {@code failOnError} will be false + * {@code dynamoDbTablesConfig} will be empty meaning no records deduplication will be + * performed by the sink + * + * + * @param type of elements that should be persisted in the destination + */ +@PublicEvolving +public class DynamoDbSinkBuilder +extends AsyncSinkBaseBuilder> { + +private static final int DEFAULT_MAX_BATCH_SIZE = 25; +private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50; Review Comment: Am thinking how this will work for users if there are multiple inflight requests with records that have the same PK/SK. - Seems like it will be a race condition where the first request to complete will take precedence - If there are failures/partial failures in a batch, the "latest" record in the stream might no longer take precedence. That said, this situation is more likely to occur if there are more records with the same PK/SK, which helps us, because that means eventual consistency will be attained quickly. In the scenario above, it seems setting `MAX_IN_FLIGHT_REQUESTS` to 1 would be the main mitigation. I wonder if we should set this as default, or at least call this out to users? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #403: [FLINK-29633] Pass fromSavepoint argument
gyfora commented on code in PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403#discussion_r996347944 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java: ## @@ -80,9 +80,14 @@ private List getApplicationClusterArgs() { } Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState(); -if (allowNonRestoredState != null) { +if (allowNonRestoredState != null && allowNonRestoredState) { args.add("--allowNonRestoredState"); -args.add(allowNonRestoredState.toString()); +} + +String savepointPath = kubernetesJobManagerParameters.getSavepointPath(); +if (savepointPath != null) { +args.add("--fromSavepoint"); +args.add(savepointPath); Review Comment: actually I think it might be enough to remove both `fromSavepoint` and `allowNonRestoredState` command line arguments and rely purely on the config. If that works I would actually lean toward that. It's much simpler and more bulletproof to just use the conf -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #403: [FLINK-29633] Pass fromSavepoint argument
gyfora commented on code in PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403#discussion_r996347785 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java: ## @@ -80,9 +80,14 @@ private List getApplicationClusterArgs() { } Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState(); -if (allowNonRestoredState != null) { +if (allowNonRestoredState != null && allowNonRestoredState) { args.add("--allowNonRestoredState"); -args.add(allowNonRestoredState.toString()); +} + +String savepointPath = kubernetesJobManagerParameters.getSavepointPath(); +if (savepointPath != null) { +args.add("--fromSavepoint"); +args.add(savepointPath); Review Comment: I don't insist I think it's good like this for more visibility, I just wanted to make sure I fully understand the reason behind this particular change. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-dynamodb] hlteoh37 commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector
hlteoh37 commented on code in PR #1: URL: https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r996321025 ## flink-connector-aws-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSink.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodb.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A DynamoDB Sink that performs async requests against a destination stream using the buffering + * protocol specified in {@link AsyncSinkBase}. + * + * The sink internally uses a {@link + * software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient} to communicate with the AWS + * endpoint. + * + * The behaviour of the buffering may be specified by providing configuration during the sink + * build time. + * + * + * {@code maxBatchSize}: the maximum size of a batch of entries that may be written to + * DynamoDb. + * {@code maxInFlightRequests}: the maximum number of in flight requests that may exist, if + * any more in flight requests need to be initiated once the maximum has been reached, then it + * will be blocked until some have completed + * {@code maxBufferedRequests}: the maximum number of elements held in the buffer, requests to + * add elements will be blocked while the number of elements in the buffer is at the maximum Review Comment: ```suggestion * {@code maxBufferedRequests}: the maximum number of elements held in the buffer, requests to * sink will backpressure while the number of elements in the buffer is at the maximum ``` Nit: This wording might be clearer from a Flink user's POV ## flink-connector-aws-dynamodb/src/test/java/org/apache/flink/streaming/connectors/dynamodb/sink/TableRequestsContainerTest.java: ## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodb.sink; + +import org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.PutRequest; +import software.amazon.awssdk.services.dynamodb.model.WriteRequest; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Tests for {@link TableRequestsContainer}. */ +public class TableRequestsContainerTest { +@Test +public void testRequestNotDeduplicatedWhenNoTableConfig() { +new Scenario() +.witItem("table", ImmutableMap.of("pk", "1", "sk", "1")) +
[GitHub] [flink-kubernetes-operator] sap1ens commented on a diff in pull request #403: [FLINK-29633] Pass fromSavepoint argument
sap1ens commented on code in PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403#discussion_r996346689 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java: ## @@ -80,9 +80,14 @@ private List getApplicationClusterArgs() { } Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState(); -if (allowNonRestoredState != null) { +if (allowNonRestoredState != null && allowNonRestoredState) { args.add("--allowNonRestoredState"); -args.add(allowNonRestoredState.toString()); +} + +String savepointPath = kubernetesJobManagerParameters.getSavepointPath(); +if (savepointPath != null) { +args.add("--fromSavepoint"); +args.add(savepointPath); Review Comment: Yep, I think you're right: it seems like just fixing the `allowNonRestoredState` flag would be enough. However, I think it's logical to provide the two options together via the command-line. They always come together (check `SavepointConfigOptions`, `SavepointRestoreSettings`, etc.) and I was really surprised when I didn't see the `fromSavepoint` argument when debugging. But if you insist on making the smallest change in this PR then I'll revert the `fromSavepoint` part. Let me know! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-29649) upgrade Maven checkstyle plugin to latest version >= v8.30
Samrat Deb created FLINK-29649: -- Summary: upgrade Maven checkstyle plugin to latest version >= v8.30 Key: FLINK-29649 URL: https://issues.apache.org/jira/browse/FLINK-29649 Project: Flink Issue Type: Improvement Reporter: Samrat Deb currenly while building project some of the typical warnings related to checkstyle ``` [WARNING] Old version of checkstyle detected. Consider updating to >= v8.30 [WARNING] For more information see: https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/upgrading-checkstyle.html [INFO] You have 0 Checkstyle violations. ``` should it be upgraded to latest stable version ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-29649) upgrade Maven checkstyle plugin to latest version >= v8.30
[ https://issues.apache.org/jira/browse/FLINK-29649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Samrat Deb updated FLINK-29649: --- Description: currenly while building project some of the typical warnings related to checkstyle {code:java} [WARNING] Old version of checkstyle detected. Consider updating to >= v8.30 [WARNING] For more information see: https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/upgrading-checkstyle.html [INFO] You have 0 Checkstyle violations.{code} should it be upgraded to latest stable version ? was: currenly while building project some of the typical warnings related to checkstyle ``` [WARNING] Old version of checkstyle detected. Consider updating to >= v8.30 [WARNING] For more information see: https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/upgrading-checkstyle.html [INFO] You have 0 Checkstyle violations. ``` should it be upgraded to latest stable version ? > upgrade Maven checkstyle plugin to latest version >= v8.30 > -- > > Key: FLINK-29649 > URL: https://issues.apache.org/jira/browse/FLINK-29649 > Project: Flink > Issue Type: Improvement >Reporter: Samrat Deb >Priority: Minor > > currenly while building project some of the typical warnings related to > checkstyle > > {code:java} > [WARNING] Old version of checkstyle detected. Consider updating to >= v8.30 > [WARNING] For more information see: > https://maven.apache.org/plugins/maven-checkstyle-plugin/examples/upgrading-checkstyle.html > [INFO] You have 0 Checkstyle violations.{code} > > should it be upgraded to latest stable version ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #403: [FLINK-29633] Pass fromSavepoint argument
gyfora commented on code in PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403#discussion_r996327566 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java: ## @@ -80,9 +80,14 @@ private List getApplicationClusterArgs() { } Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState(); -if (allowNonRestoredState != null) { +if (allowNonRestoredState != null && allowNonRestoredState) { args.add("--allowNonRestoredState"); -args.add(allowNonRestoredState.toString()); +} + +String savepointPath = kubernetesJobManagerParameters.getSavepointPath(); +if (savepointPath != null) { +args.add("--fromSavepoint"); +args.add(savepointPath); Review Comment: As we discussed on Slack, before this fix. Both `initialSavepoint` and savepoint upgrade logic worked correctly so there must not really be a need for the `--fromSavepoint` argument -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #403: [FLINK-29633] Pass fromSavepoint argument
gyfora commented on code in PR #403: URL: https://github.com/apache/flink-kubernetes-operator/pull/403#discussion_r996327486 ## flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java: ## @@ -80,9 +80,14 @@ private List getApplicationClusterArgs() { } Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState(); -if (allowNonRestoredState != null) { +if (allowNonRestoredState != null && allowNonRestoredState) { args.add("--allowNonRestoredState"); -args.add(allowNonRestoredState.toString()); +} + +String savepointPath = kubernetesJobManagerParameters.getSavepointPath(); +if (savepointPath != null) { +args.add("--fromSavepoint"); +args.add(savepointPath); Review Comment: What I meant with my other question is actually why we need this part here :) Seems like before this change the savepoint restore logic worked correctly so the config is probably enough. Wouldn't be enough to simply fix the allowNonRestoredState logic to not put true/false there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20350) Incompatible Connectors due to Guava conflict
[ https://issues.apache.org/jira/browse/FLINK-20350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618133#comment-17618133 ] Samrat Deb commented on FLINK-20350: Hi [~danny.cranmer] In ticket you have already mentioned couple of *Possible Fixes* - Align Guava versions - Shade Guava in either connector I was thinking if we can do relocation of guava package in both kinesis and pub/sub connector this will not conflict each other and they will be able to use their own guava as expected. Thoughts / Suggestion ? > Incompatible Connectors due to Guava conflict > - > > Key: FLINK-20350 > URL: https://issues.apache.org/jira/browse/FLINK-20350 > Project: Flink > Issue Type: Bug > Components: Connectors / Google Cloud PubSub, Connectors / Kinesis >Affects Versions: 1.11.1, 1.11.2 >Reporter: Danny Cranmer >Assignee: Samrat Deb >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > *Problem* > Kinesis and GCP PubSub connector do not work together. The following error is > thrown. > {code} > java.lang.NoClassDefFoundError: Could not initialize class > io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder > at > org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:52) > ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:213) > ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:102) > ~[flink-connector-gcp-pubsub_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > ~[flink-core-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) > ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) > ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) > ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > ~[flink-runtime_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > ~[flink-runtime_2.11-1.11.1.jar:1.11.1] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252] > {code} > {code} > > org.apache.flink > > flink-connector-gcp-pubsub_${scala.binary.version} > 1.11.1 > > >org.apache.flink > flink-connector-kinesis_${scala.binary.version} > 1.11.1 > > {code} > *Cause* > This is caused by a Guava dependency conflict: > - Kinesis Consumer > {{18.0}} > - GCP PubSub > {{26.0-android}} > {{NettyChannelBuilder}} fails to initialise due to missing method in guava: > - > {{com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;CLjava/lang/Object;)V}} > *Possible Fixes* > - Align Guava versions > - Shade Guava in either connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink-connector-dynamodb] hlteoh37 commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector
hlteoh37 commented on code in PR #1: URL: https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r996320784 ## flink-connector-aws-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSink.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.dynamodb.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.sink.AsyncSinkBase; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A DynamoDB Sink that performs async requests against a destination stream using the buffering Review Comment: ```suggestion * A DynamoDB Sink that performs async requests against a destination table using the buffering ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29620) Flink deployment stuck in UPGRADING state when changing configuration
[ https://issues.apache.org/jira/browse/FLINK-29620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618123#comment-17618123 ] Gyula Fora commented on FLINK-29620: I think that this might be fixed on 1.2.0 already, could you please verify? > Flink deployment stuck in UPGRADING state when changing configuration > - > > Key: FLINK-29620 > URL: https://issues.apache.org/jira/browse/FLINK-29620 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: 1.14.2 > Environment: AWS EKS v1.21 > Operator version: 1.1.0 >Reporter: liad shachoach >Priority: Major > > When I update the configuration of a flink deployment I observe one of two > scenarios: > Success: > This happens when the job has not started - if I change the configuration > quick enough: > {code:java} > 2022-10-13 06:50:54,336 o.a.f.k.o.r.d.AbstractJobReconciler [INFO > ][load-streaming/validator-process-124] Upgrading/Restarting running job, > suspending first... > 2022-10-13 06:50:54,343 o.a.f.k.o.r.d.ApplicationReconciler [INFO > ][load-streaming/validator-process-124] Job is not running but HA metadata is > available for last state restore, ready for upgrade > 2022-10-13 06:50:54,353 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Deleting JobManager deployment while > preserving HA metadata. > 2022-10-13 06:50:58,415 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (5s) > 2022-10-13 06:51:03,451 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (10s) > 2022-10-13 06:51:06,469 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Cluster shutdown completed. > 2022-10-13 06:51:06,470 o.a.f.k.o.c.FlinkDeploymentController [INFO > ][load-streaming/validator-process-124] End of reconciliation > 2022-10-13 06:51:06,493 o.a.f.k.o.c.FlinkDeploymentController [INFO > ][load-streaming/validator-process-124] Starting reconciliation > 2022-10-13 06:51:06,494 o.a.f.k.o.c.FlinkConfigManager [INFO > ][load-streaming/validator-process-124] Generating new config > {code} > In this scenario I see that the job manager and task manager pods are > terminated and then recreated. > > > Failure: > This happens when I let the job start (wait more than 30-60 seconds) and > change the configuration: > {code:java} > 2022-10-13 06:53:06,637 o.a.f.k.o.r.d.AbstractJobReconciler [INFO > ][load-streaming/validator-process-124] Upgrading/Restarting running job, > suspending first... > 2022-10-13 06:53:06,637 o.a.f.k.o.r.d.AbstractJobReconciler [INFO > ][load-streaming/validator-process-124] Job is in running state, ready for > upgrade with SAVEPOINT > 2022-10-13 06:53:06,659 o.a.f.k.o.s.FlinkService [INFO > ][load-streaming/validator-process-124] Suspending job with savepoint. > 2022-10-13 06:53:07,042 o.a.f.k.o.s.FlinkService [INFO > ][load-streaming/validator-process-124] Job successfully suspended with > savepoint > s3://cu-flink-load-checkpoints-us-east-1/validator-process-124/savepoints/savepoint-00-947975b509b2. > 2022-10-13 06:53:11,111 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (5s) > 2022-10-13 06:53:16,176 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (10s) > 2022-10-13 06:53:21,238 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (15s) > 2022-10-13 06:53:26,293 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (20s) > 2022-10-13 06:53:31,355 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (25s) > 2022-10-13 06:53:36,412 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (30s) > 2022-10-13 06:53:41,512 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (35s) > 2022-10-13 06:53:46,568 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (40s) > 2022-10-13 06:53:51,625 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (45s) > 2022-10-13 06:53:56,740 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (50s) > 2022-10-13 06:54:01,811 o.a.f.k.o.u.FlinkUtils [INFO > ][load-streaming/validator-process-124] Waiting for cluster shutdown... (55s) > 2022-10-1
[GitHub] [flink] ZmmBigdata commented on pull request #20983: [FLINK-29458][docs] When two tables have the same field, do not speci…
ZmmBigdata commented on PR #20983: URL: https://github.com/apache/flink/pull/20983#issuecomment-1279766335 > @ZmmBigdata Thanks for fixing this! It's better to fix the chinese version (though it hasn't been translated yet) `docs/content.zh/docs/dev/table/sql/queries/joins.md` as well. @lincoln-lil Can you merge this first? I'll change the Chinese document,I want a contributor,hhh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29458) When two tables have the same field, do not specify the table name,Exception will be thrown:SqlValidatorException :Column 'currency' is ambiguous
[ https://issues.apache.org/jira/browse/FLINK-29458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618116#comment-17618116 ] ZuoYan commented on FLINK-29458: [~jark] It has been modified. Please refer to [GitHub Pull Request #20983|https://github.com/apache/flink/pull/20983] > When two tables have the same field, do not specify the table name,Exception > will be thrown:SqlValidatorException :Column 'currency' is ambiguous > - > > Key: FLINK-29458 > URL: https://issues.apache.org/jira/browse/FLINK-29458 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table SQL / API >Affects Versions: 1.14.4 >Reporter: ZuoYan >Assignee: ZuoYan >Priority: Major > Labels: pull-request-available > Attachments: image-2022-09-28-21-00-01-302.png, > image-2022-09-28-21-00-09-054.png, image-2022-09-28-21-00-22-733.png > > > When two tables are join, the two tables have the same field. When querying > select, an exception will be thrown if the table name is not specified > exception content > Column 'currency' is ambiguous。 > !image-2022-09-28-21-00-22-733.png! > > !image-2022-09-28-21-00-01-302.png! > !image-2022-09-28-21-00-09-054.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] WencongLiu commented on pull request #21076: [FLINK-29646][sql-gateway] rest endpoint return a simpler error message
WencongLiu commented on PR #21076: URL: https://github.com/apache/flink/pull/21076#issuecomment-1279753603 Thanks for your contribution @huyuanfeng2018. I think this pull request should be merged into master branch rather than release-1.16. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29646) SQL Gateway should return a simpler error message
[ https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618104#comment-17618104 ] Wencong Liu commented on FLINK-29646: - Thanks [~heigebupahei] . +1 for this change. At present, sql gateway rest endpoint returns all the call stacks of the exception to the client if there is an internal error, which needs to be trimmed to make the root cause more clear. cc [~fsk119] [~xtsong] > SQL Gateway should return a simpler error message > - > > Key: FLINK-29646 > URL: https://issues.apache.org/jira/browse/FLINK-29646 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: yuanfenghu >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > sql gateway should return simpler exception information > for example: > If i execute a sql statement through sql gateway but my statement has > syntax error :[ inset into tablea select * from tableb ] > When I get exception information. The abnormal information returned by the > server is too redundant to quickly find the Key Information. > {code:java} > org.apache.flink.table.gateway.api.utils.SqlGatewayException: > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to > fetchResults. > at > org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77) > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84) > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52) > at > org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) > at java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:210) > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.net
[GitHub] [flink-connector-dynamodb] darenwkt opened a new pull request, #10: [FLINK-29310] Added license check
darenwkt opened a new pull request, #10: URL: https://github.com/apache/flink-connector-dynamodb/pull/10 Added maven plugin to use in ci workflow for license check. Note: - License check current runs on mvn install directory, which will need to be updated when switched to deploy mode. - We are using flink-ci-tools dependency which is only available as SNAPSHOT at the moment. https://repository.apache.org/content/repositories/snapshots/org/apache/flink/flink-ci-tools/ Previous closed PR: https://github.com/apache/flink-connector-dynamodb/pull/8. Created new PR to update branch to pull from. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-connector-dynamodb] darenwkt closed pull request #8: [FLINK-29310] Added license check
darenwkt closed pull request #8: [FLINK-29310] Added license check URL: https://github.com/apache/flink-connector-dynamodb/pull/8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-10989) OrcRowInputFormat uses two different file systems
[ https://issues.apache.org/jira/browse/FLINK-10989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618088#comment-17618088 ] Angelo Kastroulis commented on FLINK-10989: --- At the very minimum, this should be documented in one of the numerous places in the docs that tell you how to implement S3 in Flink. As it sits, those docs are inaccurate because they say nothing about the incompatibility between the filesystem and the formats and make it sound like all you have to do is put the filesystem plugin in the right place and you're set. You're not set at all if you want to use a sane option like parquet or orc. This it not a minor issue (unless folks just don't use S3 with formats with Flink). > OrcRowInputFormat uses two different file systems > - > > Key: FLINK-10989 > URL: https://issues.apache.org/jira/browse/FLINK-10989 > Project: Flink > Issue Type: Bug > Components: Connectors / ORC >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > The {{OrcRowInputFormat}} seems to use two different {{FileSystem}}. The > Flink {{FileSystem}} for listing the files and generating the {{InputSplits}} > and then Hadoop's {{FileSystem}} to actually read the input splits. This can > be problematic if one only configures Flink's S3 {{FileSystem}} but does not > provide a S3 implementation for Hadoop's {{FileSystem}}. > I think this is not an intuitive behaviour and can lead to hard to debug > problems for a user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on code in PR #20757: URL: https://github.com/apache/flink/pull/20757#discussion_r996303463 ## docs/content/docs/connectors/datastream/datagen.md: ## @@ -0,0 +1,115 @@ +--- +title: DataGen +weight: 3 +type: docs +--- + + +# DataGen Connector + +The DataGen connector provides a `Source` implementation that allows for generating input data for +Flink pipelines. +It is useful when developing locally or demoing without access to external systems such as Kafka. +The DataGen connector is built-in, no additional dependencies are required. + +Usage +- + +The `DataGeneratorSource` produces N data points in parallel. The source splits the sequence +into as many parallel sub-sequences as there are parallel source subtasks. It drives the data +generation process by supplying "index" values of type Long to the user-provided +{{< javadoc name="GeneratorFunction" file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}. + +The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long values +into the generated events of an arbitrary data type. For instance, the following code will produce the sequence of +`["Number: 0", "Number: 2", ... , "Number: 999"]` records. + +```java +GeneratorFunction generatorFunction = index -> "Number: " + index; + +DataGeneratorSource source = +new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING); + +DataStreamSource stream = +env.fromSource(source, +WatermarkStrategy.noWatermarks(), +"Generator Source"); +``` + +The order of elements depends on the parallelism. Each sub-sequence will be produced in order. +Consequently, if the parallelism is limited to one, this will produce one sequence in order from +`"Number: 0"` to `"Number: 999"`. + +`DataGeneratorSource` has built-in support for rate limiting. The following code will produce an +effectively unbounded (`Long.MAX_VALUE` from a practical perspective will never be reached) stream of +Long values at the overall source rate (across all source subtasks) not exceeding 100 events per second. + +```java +GeneratorFunction generatorFunction = index -> index; + +DataGeneratorSource source = +new DataGeneratorSource<>( + generatorFunctionStateless, + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING); +``` + +The source also allows for producing specific elements between the checkpoint boundaries using the Review Comment: Urgh but an `ExternallyInducedSourceReader` has to provide the checkpoint IDs which we'd have to checkpoint, so we'd also need a wrapper around the NumberSequenceSplits... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on code in PR #20757: URL: https://github.com/apache/flink/pull/20757#discussion_r996305134 ## docs/content/docs/connectors/datastream/datagen.md: ## @@ -0,0 +1,115 @@ +--- +title: DataGen +weight: 3 +type: docs +--- + + +# DataGen Connector + +The DataGen connector provides a `Source` implementation that allows for generating input data for +Flink pipelines. +It is useful when developing locally or demoing without access to external systems such as Kafka. +The DataGen connector is built-in, no additional dependencies are required. + +Usage +- + +The `DataGeneratorSource` produces N data points in parallel. The source splits the sequence +into as many parallel sub-sequences as there are parallel source subtasks. It drives the data +generation process by supplying "index" values of type Long to the user-provided +{{< javadoc name="GeneratorFunction" file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}. + +The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long values +into the generated events of an arbitrary data type. For instance, the following code will produce the sequence of +`["Number: 0", "Number: 2", ... , "Number: 999"]` records. + +```java +GeneratorFunction generatorFunction = index -> "Number: " + index; + +DataGeneratorSource source = +new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING); + +DataStreamSource stream = +env.fromSource(source, +WatermarkStrategy.noWatermarks(), +"Generator Source"); +``` + +The order of elements depends on the parallelism. Each sub-sequence will be produced in order. +Consequently, if the parallelism is limited to one, this will produce one sequence in order from +`"Number: 0"` to `"Number: 999"`. + +`DataGeneratorSource` has built-in support for rate limiting. The following code will produce an +effectively unbounded (`Long.MAX_VALUE` from a practical perspective will never be reached) stream of +Long values at the overall source rate (across all source subtasks) not exceeding 100 events per second. + +```java +GeneratorFunction generatorFunction = index -> index; + +DataGeneratorSource source = +new DataGeneratorSource<>( + generatorFunctionStateless, + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING); +``` + +The source also allows for producing specific elements between the checkpoint boundaries using the Review Comment: The `ExternallyInducedSourceReader` route would add so much complexity that I'm not sure if we should pursue it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message
[ https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuanfenghu updated FLINK-29646: --- Priority: Major (was: Critical) > SQL Gateway should return a simpler error message > - > > Key: FLINK-29646 > URL: https://issues.apache.org/jira/browse/FLINK-29646 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: yuanfenghu >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0 > > > sql gateway should return simpler exception information > for example: > If i execute a sql statement through sql gateway but my statement has > syntax error :[ inset into tablea select * from tableb ] > When I get exception information. The abnormal information returned by the > server is too redundant to quickly find the Key Information. > {code:java} > org.apache.flink.table.gateway.api.utils.SqlGatewayException: > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to > fetchResults. > at > org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77) > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84) > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52) > at > org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) > at java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:210) > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.n
[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message
[ https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yuanfenghu updated FLINK-29646: --- Priority: Critical (was: Minor) > SQL Gateway should return a simpler error message > - > > Key: FLINK-29646 > URL: https://issues.apache.org/jira/browse/FLINK-29646 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: yuanfenghu >Priority: Critical > Labels: pull-request-available > Fix For: 1.16.0 > > > sql gateway should return simpler exception information > for example: > If i execute a sql statement through sql gateway but my statement has > syntax error :[ inset into tablea select * from tableb ] > When I get exception information. The abnormal information returned by the > server is too redundant to quickly find the Key Information. > {code:java} > org.apache.flink.table.gateway.api.utils.SqlGatewayException: > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to > fetchResults. > at > org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77) > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84) > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52) > at > org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) > at java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:210) > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shade
[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on code in PR #20757: URL: https://github.com/apache/flink/pull/20757#discussion_r996303463 ## docs/content/docs/connectors/datastream/datagen.md: ## @@ -0,0 +1,115 @@ +--- +title: DataGen +weight: 3 +type: docs +--- + + +# DataGen Connector + +The DataGen connector provides a `Source` implementation that allows for generating input data for +Flink pipelines. +It is useful when developing locally or demoing without access to external systems such as Kafka. +The DataGen connector is built-in, no additional dependencies are required. + +Usage +- + +The `DataGeneratorSource` produces N data points in parallel. The source splits the sequence +into as many parallel sub-sequences as there are parallel source subtasks. It drives the data +generation process by supplying "index" values of type Long to the user-provided +{{< javadoc name="GeneratorFunction" file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}. + +The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long values +into the generated events of an arbitrary data type. For instance, the following code will produce the sequence of +`["Number: 0", "Number: 2", ... , "Number: 999"]` records. + +```java +GeneratorFunction generatorFunction = index -> "Number: " + index; + +DataGeneratorSource source = +new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING); + +DataStreamSource stream = +env.fromSource(source, +WatermarkStrategy.noWatermarks(), +"Generator Source"); +``` + +The order of elements depends on the parallelism. Each sub-sequence will be produced in order. +Consequently, if the parallelism is limited to one, this will produce one sequence in order from +`"Number: 0"` to `"Number: 999"`. + +`DataGeneratorSource` has built-in support for rate limiting. The following code will produce an +effectively unbounded (`Long.MAX_VALUE` from a practical perspective will never be reached) stream of +Long values at the overall source rate (across all source subtasks) not exceeding 100 events per second. + +```java +GeneratorFunction generatorFunction = index -> index; + +DataGeneratorSource source = +new DataGeneratorSource<>( + generatorFunctionStateless, + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING); +``` + +The source also allows for producing specific elements between the checkpoint boundaries using the Review Comment: Urgh but an `ExternallyInducedSourceReader` has to provide the checkpoint IDs so that'd need checkpointing, so we'd also need a wrapper around the NumberSequenceSplits... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot commented on pull request #21076: [FLINK-29646][sql-gateway] rest endpoint return a simpler error message
flinkbot commented on PR #21076: URL: https://github.com/apache/flink/pull/21076#issuecomment-1279741271 ## CI report: * 0a78909554fce3c02ee4ee35df48c0d7dacd6d59 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on code in PR #20757: URL: https://github.com/apache/flink/pull/20757#discussion_r996303463 ## docs/content/docs/connectors/datastream/datagen.md: ## @@ -0,0 +1,115 @@ +--- +title: DataGen +weight: 3 +type: docs +--- + + +# DataGen Connector + +The DataGen connector provides a `Source` implementation that allows for generating input data for +Flink pipelines. +It is useful when developing locally or demoing without access to external systems such as Kafka. +The DataGen connector is built-in, no additional dependencies are required. + +Usage +- + +The `DataGeneratorSource` produces N data points in parallel. The source splits the sequence +into as many parallel sub-sequences as there are parallel source subtasks. It drives the data +generation process by supplying "index" values of type Long to the user-provided +{{< javadoc name="GeneratorFunction" file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}. + +The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long values +into the generated events of an arbitrary data type. For instance, the following code will produce the sequence of +`["Number: 0", "Number: 2", ... , "Number: 999"]` records. + +```java +GeneratorFunction generatorFunction = index -> "Number: " + index; + +DataGeneratorSource source = +new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING); + +DataStreamSource stream = +env.fromSource(source, +WatermarkStrategy.noWatermarks(), +"Generator Source"); +``` + +The order of elements depends on the parallelism. Each sub-sequence will be produced in order. +Consequently, if the parallelism is limited to one, this will produce one sequence in order from +`"Number: 0"` to `"Number: 999"`. + +`DataGeneratorSource` has built-in support for rate limiting. The following code will produce an +effectively unbounded (`Long.MAX_VALUE` from a practical perspective will never be reached) stream of +Long values at the overall source rate (across all source subtasks) not exceeding 100 events per second. + +```java +GeneratorFunction generatorFunction = index -> index; + +DataGeneratorSource source = +new DataGeneratorSource<>( + generatorFunctionStateless, + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING); +``` + +The source also allows for producing specific elements between the checkpoint boundaries using the Review Comment: Urgh but an `ExternallyInducedSourceReader` has to provide the checkpoint IDs so that'd need checkpointing... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29646) SQL Gateway should return a simpler error message
[ https://issues.apache.org/jira/browse/FLINK-29646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-29646: --- Labels: pull-request-available (was: ) > SQL Gateway should return a simpler error message > - > > Key: FLINK-29646 > URL: https://issues.apache.org/jira/browse/FLINK-29646 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Gateway >Affects Versions: 1.16.0 >Reporter: yuanfenghu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > sql gateway should return simpler exception information > for example: > If i execute a sql statement through sql gateway but my statement has > syntax error :[ inset into tablea select * from tableb ] > When I get exception information. The abnormal information returned by the > server is too redundant to quickly find the Key Information. > {code:java} > org.apache.flink.table.gateway.api.utils.SqlGatewayException: > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to > fetchResults. > at > org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:77) > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84) > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52) > at > org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) > at java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) > at > org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:210) > at > org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache
[GitHub] [flink] huyuanfeng2018 opened a new pull request, #21076: [FLINK-29646][sql-gateway] rest endpoint return a simpler error message
huyuanfeng2018 opened a new pull request, #21076: URL: https://github.com/apache/flink/pull/21076 ## What is the purpose of the change this pull request make sql-gateway fetch key error information instead of return all stack of information, when operation happen error ## Brief change log 1. Modify the method `ExceptionUtils.stringifyException` and add a parameter to return only root information 2. `AbstractSqlGatewayRestHandler` adds a `handleException` method to catch SqlGatewayException so that it only returns key information to the client ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no ) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on code in PR #20757: URL: https://github.com/apache/flink/pull/20757#discussion_r996303005 ## docs/content/docs/connectors/datastream/datagen.md: ## @@ -0,0 +1,115 @@ +--- +title: DataGen +weight: 3 +type: docs +--- + + +# DataGen Connector + +The DataGen connector provides a `Source` implementation that allows for generating input data for +Flink pipelines. +It is useful when developing locally or demoing without access to external systems such as Kafka. +The DataGen connector is built-in, no additional dependencies are required. + +Usage +- + +The `DataGeneratorSource` produces N data points in parallel. The source splits the sequence +into as many parallel sub-sequences as there are parallel source subtasks. It drives the data +generation process by supplying "index" values of type Long to the user-provided +{{< javadoc name="GeneratorFunction" file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}. + +The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long values +into the generated events of an arbitrary data type. For instance, the following code will produce the sequence of +`["Number: 0", "Number: 2", ... , "Number: 999"]` records. + +```java +GeneratorFunction generatorFunction = index -> "Number: " + index; + +DataGeneratorSource source = +new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING); + +DataStreamSource stream = +env.fromSource(source, +WatermarkStrategy.noWatermarks(), +"Generator Source"); +``` + +The order of elements depends on the parallelism. Each sub-sequence will be produced in order. +Consequently, if the parallelism is limited to one, this will produce one sequence in order from +`"Number: 0"` to `"Number: 999"`. + +`DataGeneratorSource` has built-in support for rate limiting. The following code will produce an +effectively unbounded (`Long.MAX_VALUE` from a practical perspective will never be reached) stream of +Long values at the overall source rate (across all source subtasks) not exceeding 100 events per second. + +```java +GeneratorFunction generatorFunction = index -> index; + +DataGeneratorSource source = +new DataGeneratorSource<>( + generatorFunctionStateless, + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING); +``` + +The source also allows for producing specific elements between the checkpoint boundaries using the Review Comment: I'm not sure yet. But we have some time before the merge anyhow due to the japicmp issue. One issue is that controlling checkpointing should be optional, but once you extend the `ExternallyInducedSourceReader` it's always enabled. So we can't just add it to the `RateLimitedSourceReader`, but rather have to wrap it based on some condition. Beyond that, I'm wondering if this should be part of the `RateLimiter` (which then be more of a `RateController`) or a separate thing that provides a lower bound while the rate limiter provides an upper bound (with the problem that if they conflict weird things will happen). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] afedulov commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
afedulov commented on code in PR #20757: URL: https://github.com/apache/flink/pull/20757#discussion_r996297824 ## docs/content/docs/connectors/datastream/datagen.md: ## @@ -0,0 +1,115 @@ +--- +title: DataGen +weight: 3 +type: docs +--- + + +# DataGen Connector + +The DataGen connector provides a `Source` implementation that allows for generating input data for +Flink pipelines. +It is useful when developing locally or demoing without access to external systems such as Kafka. +The DataGen connector is built-in, no additional dependencies are required. + +Usage +- + +The `DataGeneratorSource` produces N data points in parallel. The source splits the sequence +into as many parallel sub-sequences as there are parallel source subtasks. It drives the data +generation process by supplying "index" values of type Long to the user-provided +{{< javadoc name="GeneratorFunction" file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}. + +The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long values +into the generated events of an arbitrary data type. For instance, the following code will produce the sequence of +`["Number: 0", "Number: 2", ... , "Number: 999"]` records. + +```java +GeneratorFunction generatorFunction = index -> "Number: " + index; + +DataGeneratorSource source = +new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING); + +DataStreamSource stream = +env.fromSource(source, +WatermarkStrategy.noWatermarks(), +"Generator Source"); +``` + +The order of elements depends on the parallelism. Each sub-sequence will be produced in order. +Consequently, if the parallelism is limited to one, this will produce one sequence in order from +`"Number: 0"` to `"Number: 999"`. + +`DataGeneratorSource` has built-in support for rate limiting. The following code will produce an +effectively unbounded (`Long.MAX_VALUE` from a practical perspective will never be reached) stream of +Long values at the overall source rate (across all source subtasks) not exceeding 100 events per second. + +```java +GeneratorFunction generatorFunction = index -> index; + +DataGeneratorSource source = +new DataGeneratorSource<>( + generatorFunctionStateless, + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING); +``` + +The source also allows for producing specific elements between the checkpoint boundaries using the Review Comment: > There's an ExternallyInducedSourceReader that can control when checkpoints occur. Nice :+1:. Do you propose to add it to the `RateLimitedSourceReader` now or to address it as a follow-up improvement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-29557) The SinkOperator's OutputFormat function is not recognized
[ https://issues.apache.org/jira/browse/FLINK-29557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618052#comment-17618052 ] Aitozi commented on FLINK-29557: Thanks [~gaoyunhaii] for your reply. The {{OutputFormatProvider}} in the table module will generate the {{SinkOperator}}. And the {{SinkOperator}} with {{OutputFormatSinkFunction}} will not be treated as an operator with {{OutputFormat}} in the {{SimpleOperatorFactory}}. In the end, it will miss the chance to execute the hook method eg: {{InitializeOnMaster}} > The SinkOperator's OutputFormat function is not recognized > -- > > Key: FLINK-29557 > URL: https://issues.apache.org/jira/browse/FLINK-29557 > Project: Flink > Issue Type: Bug > Components: API / Core, Table SQL / API >Reporter: Aitozi >Priority: Major > > In the {{SimpleOperatorFactory#of}}, only {{StreamSink}} is handled to > register as {{SimpleOutputFormatOperatorFactory}}. So it will lost the output > format information in {{SinkOperator}}. Then some hook functions like > {{FinalizeOnMaster}} will have no chance to be executed. > Due to the {{SinkOperator}} is in the table module, it can't be reached > directly in the {{flink-streaming-java}}. So maybe we need introduce an extra > common class eg: {{SinkFunctionOperator}} to describe the {{Sink}} operator > and handle it individually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on code in PR #20757: URL: https://github.com/apache/flink/pull/20757#discussion_r996290206 ## docs/content/docs/connectors/datastream/datagen.md: ## @@ -0,0 +1,115 @@ +--- +title: DataGen +weight: 3 +type: docs +--- + + +# DataGen Connector + +The DataGen connector provides a `Source` implementation that allows for generating input data for +Flink pipelines. +It is useful when developing locally or demoing without access to external systems such as Kafka. +The DataGen connector is built-in, no additional dependencies are required. + +Usage +- + +The `DataGeneratorSource` produces N data points in parallel. The source splits the sequence +into as many parallel sub-sequences as there are parallel source subtasks. It drives the data +generation process by supplying "index" values of type Long to the user-provided +{{< javadoc name="GeneratorFunction" file="org/apache/flink/connector/datagen/source/GeneratorFunction.html" >}}. + +The `GeneratorFunction` is then used for mapping the (sub-)sequences of Long values +into the generated events of an arbitrary data type. For instance, the following code will produce the sequence of +`["Number: 0", "Number: 2", ... , "Number: 999"]` records. + +```java +GeneratorFunction generatorFunction = index -> "Number: " + index; + +DataGeneratorSource source = +new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING); + +DataStreamSource stream = +env.fromSource(source, +WatermarkStrategy.noWatermarks(), +"Generator Source"); +``` + +The order of elements depends on the parallelism. Each sub-sequence will be produced in order. +Consequently, if the parallelism is limited to one, this will produce one sequence in order from +`"Number: 0"` to `"Number: 999"`. + +`DataGeneratorSource` has built-in support for rate limiting. The following code will produce an +effectively unbounded (`Long.MAX_VALUE` from a practical perspective will never be reached) stream of +Long values at the overall source rate (across all source subtasks) not exceeding 100 events per second. + +```java +GeneratorFunction generatorFunction = index -> index; + +DataGeneratorSource source = +new DataGeneratorSource<>( + generatorFunctionStateless, + Long.MAX_VALUE, + RateLimiterStrategy.perSecond(100), + Types.STRING); +``` + +The source also allows for producing specific elements between the checkpoint boundaries using the Review Comment: There's an `ExternallyInducedSourceReader` that can control when checkpoints occur. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] ashmeet-kandhari commented on pull request #20991: [FLINK-25538][flink-connector-kafka] JUnit5 Migration
ashmeet-kandhari commented on PR #20991: URL: https://github.com/apache/flink/pull/20991#issuecomment-1279718997 > Thanks for your contribution @ashmeet-kandhari As I mentioned in jira issue we could continue code related discussions here > > Could you please share more detail/link to ci with failure you've mentioned? also about changes: i think changes of `docs/themes/book` are not necessary for this PR and could be removed Hi @snuyanzin, I was testing the migration changes made to KafkaTableITCase and UpsertKafkaTableITCase locally, I have pushed it now Will see the CI again and let you know if it's failing. I will also slowly revert the changes made in `docs/themes/book` not sure what caused them, as I have not touched it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on PR #20757: URL: https://github.com/apache/flink/pull/20757#issuecomment-1279718382 We're running into a bug in the Japicmp plugin; I'll try to resolve that upstream. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on a diff in pull request #20757: [FLINK-27919] Add FLIP-27-based source for data generation (FLIP-238)
zentol commented on code in PR #20757: URL: https://github.com/apache/flink/pull/20757#discussion_r996284438 ## flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReaderBase.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib.util; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.core.io.InputStatus; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link SourceReader} that returns the values of an iterator, supplied via an {@link + * IteratorSourceSplit}. + * + * The {@code IteratorSourceSplit} is also responsible for taking the current iterator and + * turning it back into a split for checkpointing. + * + * @param The type of events returned by the reader. + * @param The type of the iterator that produces the events. This type exists to make the + * conversion between iterator and {@code IteratorSourceSplit} type safe. + * @param The concrete type of the {@code IteratorSourceSplit} that creates and converts + * the iterator that produces this reader's elements. + */ +@Public +public abstract class IteratorSourceReaderBase< +E, O, IterT extends Iterator, SplitT extends IteratorSourceSplit> +implements SourceReader { + +/** The context for this reader, to communicate with the enumerator. */ +private final SourceReaderContext context; + +/** The availability future. This reader is available as soon as a split is assigned. */ +private CompletableFuture availability; + +/** + * The iterator producing data. Non-null after a split has been assigned. This field is null or + * non-null always together with the {@link #currentSplit} field. + */ +@Nullable private IterT iterator; + +/** + * The split whose data we return. Non-null after a split has been assigned. This field is null + * or non-null always together with the {@link #iterator} field. + */ +@Nullable private SplitT currentSplit; + +/** The remaining splits that were assigned but not yet processed. */ +private final Queue remainingSplits; + +private boolean noMoreSplits; + +public IteratorSourceReaderBase(SourceReaderContext context) { +this.context = checkNotNull(context); +this.availability = new CompletableFuture<>(); +this.remainingSplits = new ArrayDeque<>(); +} + +// + +@Override +public final void start() { Review Comment: I will remove the `final` flag because it's not source-compatible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-29648) "LocalDateTime not supported" error when retrieving Java TypeInformation from PyFlink
[ https://issues.apache.org/jira/browse/FLINK-29648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Juntao Hu updated FLINK-29648: -- Priority: Critical (was: Blocker) > "LocalDateTime not supported" error when retrieving Java TypeInformation from > PyFlink > - > > Key: FLINK-29648 > URL: https://issues.apache.org/jira/browse/FLINK-29648 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.16.0 >Reporter: Juntao Hu >Priority: Critical > Fix For: 1.16.0 > > > The following code raises "TypeError: The java type info: LocalDateTime is > not supported in PyFlink currently.": > {code:java} > t_env.to_data_stream(t).key_by(...){code} > However, this works: > {code:java} > t_env.to_data_stream(t).map(lambda r: r).key_by(...){code} > Although we add Python coders for LocalTimeTypeInfo in 1.16, there's no > corresponding typeinfo at Python side. So it works when a user immediately > does processing after to_data_stream since date/time data has already been > converted to Python object, but when key_by tries to retrieve typeinfo from > Java TypeInformation, it fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29648) "LocalDateTime not supported" error when retrieving Java TypeInformation from PyFlink
Juntao Hu created FLINK-29648: - Summary: "LocalDateTime not supported" error when retrieving Java TypeInformation from PyFlink Key: FLINK-29648 URL: https://issues.apache.org/jira/browse/FLINK-29648 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.16.0 Reporter: Juntao Hu Fix For: 1.16.0 The following code raises "TypeError: The java type info: LocalDateTime is not supported in PyFlink currently.": {code:java} t_env.to_data_stream(t).key_by(...){code} However, this works: {code:java} t_env.to_data_stream(t).map(lambda r: r).key_by(...){code} Although we add Python coders for LocalTimeTypeInfo in 1.16, there's no corresponding typeinfo at Python side. So it works when a user immediately does processing after to_data_stream since date/time data has already been converted to Python object, but when key_by tries to retrieve typeinfo from Java TypeInformation, it fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-29608) 使用 pyflink1.17dev datastream 经过reduce 后 add_sink(FlinkKafkaProducer()) 有问题
[ https://issues.apache.org/jira/browse/FLINK-29608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618034#comment-17618034 ] Dian Fu edited comment on FLINK-29608 at 10/15/22 7:37 AM: --- >From the exception stack: {code:java} Caused by: java.lang.ClassCastException: class org.apache.flink.api.java.tuple.Tuple2 cannot be cast to class org.apache.flink.types.Row (org.apache.flink.api.java.tuple.Tuple2 and org.apache.flink.types.Row are in unnamed module of loader 'app')* {code} I believe that the result type of result datastream is Tuple, however the sink accepts Row type as inputs and so you need to convert the result type of the result datastream into Row type before writing it into sink. Could refer to https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/datastream/data_types/#passing-python-records-to-java-operations for more details. If that's not the case, please provide a simple example which could reproduce this issue. was (Author: dianfu): >From the exception stack: {code:java} Caused by: java.lang.ClassCastException: class org.apache.flink.api.java.tuple.Tuple2 cannot be cast to class org.apache.flink.types.Row (org.apache.flink.api.java.tuple.Tuple2 and org.apache.flink.types.Row are in unnamed module of loader 'app')* {code} I believe that the result type of result datastream is Tuple, however the sink accepts Row datatype as inputs and so you need to convert the result type of the result datastream to Row before write into sink. Could refer to https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/datastream/data_types/#passing-python-records-to-java-operations for more details. > 使用 pyflink1.17dev datastream 经过reduce 后 add_sink(FlinkKafkaProducer()) 有问题 > > > Key: FLINK-29608 > URL: https://issues.apache.org/jira/browse/FLINK-29608 > Project: Flink > Issue Type: New Feature > Components: API / Python > Environment: python 3.8 > pyflink 1.17dev >Reporter: 王伟 >Priority: Minor > > 13> (1,missing) > 6> (1,offset) > Traceback (most recent call last): > File "stream.py", line 84, in > main() > File "stream.py", line 79, in main > env.execute('datastream_api_demo') > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py", > line 764, in execute > return > JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/py4j/java_gateway.py", > line 1321, in __call__ > return_value = get_return_value( > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/py4j/protocol.py", > line 326, in get_return_value > raise Py4JJavaError( > py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. > : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > at > java.bas
[jira] [Comment Edited] (FLINK-29608) 使用 pyflink1.17dev datastream 经过reduce 后 add_sink(FlinkKafkaProducer()) 有问题
[ https://issues.apache.org/jira/browse/FLINK-29608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618034#comment-17618034 ] Dian Fu edited comment on FLINK-29608 at 10/15/22 7:35 AM: --- >From the exception stack: {code:java} Caused by: java.lang.ClassCastException: class org.apache.flink.api.java.tuple.Tuple2 cannot be cast to class org.apache.flink.types.Row (org.apache.flink.api.java.tuple.Tuple2 and org.apache.flink.types.Row are in unnamed module of loader 'app')* {code} I believe that the result type of result datastream is Tuple, however the sink accepts Row datatype as inputs and so you need to convert the result type of the result datastream to Row before write into sink. Could refer to https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/datastream/data_types/#passing-python-records-to-java-operations for more details. was (Author: dianfu): >From the exception stack: ``` I don't > 使用 pyflink1.17dev datastream 经过reduce 后 add_sink(FlinkKafkaProducer()) 有问题 > > > Key: FLINK-29608 > URL: https://issues.apache.org/jira/browse/FLINK-29608 > Project: Flink > Issue Type: New Feature > Components: API / Python > Environment: python 3.8 > pyflink 1.17dev >Reporter: 王伟 >Priority: Minor > > 13> (1,missing) > 6> (1,offset) > Traceback (most recent call last): > File "stream.py", line 84, in > main() > File "stream.py", line 79, in main > env.execute('datastream_api_demo') > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py", > line 764, in execute > return > JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/py4j/java_gateway.py", > line 1321, in __call__ > return_value = get_return_value( > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/py4j/protocol.py", > line 326, in get_return_value > raise Py4JJavaError( > py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. > : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) > at akka.dispatch.OnComplete.internal(Future.scala:300) > at akka.dispatch.OnComplete.internal(Future.scala:297) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) > at ak
[jira] [Commented] (FLINK-29608) 使用 pyflink1.17dev datastream 经过reduce 后 add_sink(FlinkKafkaProducer()) 有问题
[ https://issues.apache.org/jira/browse/FLINK-29608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17618034#comment-17618034 ] Dian Fu commented on FLINK-29608: - >From the exception stack: ``` I don't > 使用 pyflink1.17dev datastream 经过reduce 后 add_sink(FlinkKafkaProducer()) 有问题 > > > Key: FLINK-29608 > URL: https://issues.apache.org/jira/browse/FLINK-29608 > Project: Flink > Issue Type: New Feature > Components: API / Python > Environment: python 3.8 > pyflink 1.17dev >Reporter: 王伟 >Priority: Minor > > 13> (1,missing) > 6> (1,offset) > Traceback (most recent call last): > File "stream.py", line 84, in > main() > File "stream.py", line 79, in main > env.execute('datastream_api_demo') > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py", > line 764, in execute > return > JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/py4j/java_gateway.py", > line 1321, in __call__ > return_value = get_return_value( > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File > "/home/ustc/anaconda3/envs/pcb_server/lib/python3.8/site-packages/py4j/protocol.py", > line 326, in get_return_value > raise Py4JJavaError( > py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute. > : org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:267) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) > at akka.dispatch.OnComplete.internal(Future.scala:300) > at akka.dispatch.OnComplete.internal(Future.scala:297) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) > at > akka.pattern.