[GitHub] [flink] sunxiaoguang commented on pull request #15365: [FLINK-21108][rest-client] Add basic & digest auth support to rest client
sunxiaoguang commented on pull request #15365: URL: https://github.com/apache/flink/pull/15365#issuecomment-806385762 I'm working on unit tests recently, comments on design and implementation about this feature is welcome. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-21675) Push filter into the scan when watermark assigner is the parent of the table scan
[ https://issues.apache.org/jira/browse/FLINK-21675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308396#comment-17308396 ] Yuval Itzchakov commented on FLINK-21675: - @Shengkai Fang Hey, would appreciate if you could take the time to review this. > Push filter into the scan when watermark assigner is the parent of the table > scan > - > > Key: FLINK-21675 > URL: https://issues.apache.org/jira/browse/FLINK-21675 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Shengkai Fang >Priority: Major > Labels: pull-request-available > > When watermark assigner is the parent of the table scan, it will block rule > to apply the filter push down. We can add a rule just like > {{ProjectWatermarkAssignerTransposeRule}} or extend the rule like > {{PushWatermarkIntoTableSourceScanAcrossCalcRule}} and > {{PushWatermarkIntoTableSourceScanRule}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot commented on pull request #15366: Sql file
flinkbot commented on pull request #15366: URL: https://github.com/apache/flink/pull/15366#issuecomment-806382815 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit c2cb1540e5d935c3c440c68f8883fe393504dee7 (Thu Mar 25 05:49:19 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **Invalid pull request title: No valid Jira ID provided** Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #15265: [FLINK-21836][table-api] Introduce ParseStrategyParser
wuchong commented on a change in pull request #15265: URL: https://github.com/apache/flink/pull/15265#discussion_r601068115 ## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema; +import org.apache.flink.table.utils.CatalogManagerMocks; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; + +import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +/** Test for {@link ParserImpl}. */ +public class ParserImplTest { + +@Rule public ExpectedException thrown = ExpectedException.none(); + +private final boolean isStreamingMode = false; +private final TableConfig tableConfig = new TableConfig(); +private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default"); +private final CatalogManager catalogManager = + CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", catalog).build(); +private final ModuleManager moduleManager = new ModuleManager(); +private final FunctionCatalog functionCatalog = +new FunctionCatalog(tableConfig, catalogManager, moduleManager); +private final PlannerContext plannerContext = +new PlannerContext( +tableConfig, +functionCatalog, +catalogManager, +asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), +new ArrayList<>()); + +private final Supplier plannerSupplier = +() -> +plannerContext.createFlinkPlanner( +catalogManager.getCurrentCatalog(), +catalogManager.getCurrentDatabase()); + +private final Parser parser = +new ParserImpl( +catalogManager, +plannerSupplier, +() -> plannerSupplier.get().parser(), +t -> +plannerContext.createSqlExprToRexConverter( + plannerContext.getTypeFactory().buildRelNodeRowType(t))); + +private List testLegalStatements; +private List testIllegalStatements; + +@Before +public void setup() { +testLegalStatements = +Arrays.asList( + TestSpec.forStatement("ClEaR").expectedSummary("CLEAR"), +TestSpec.forStatement("hElP").expectedSummary("HELP"), +TestSpec.forStatement("qUIT").expectedSummary("QUIT"), +TestSpec.forStatement("ExIT").expectedSummary("EXIT"), + TestSpec.forStatement("REsEt").expectedSummary("RESET"), +TestSpec.forStatement(" SEt ").expectedSummary("SET"), +TestSpec.forStatement("SET execution.runtime-type=batch") +.expectedSummary("SET execution.runtime-type=batch"), +TestSpec.forStatement("SET pipeline.jars = /path/to/test-_-jar.jar") +.expectedSummary("SET
[GitHub] [flink] flinkbot commented on pull request #15365: [FLINK-21108][rest-client] Add basic & digest auth support to rest client
flinkbot commented on pull request #15365: URL: https://github.com/apache/flink/pull/15365#issuecomment-806382094 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 3441a04092840a70400afa3f8a087d5add4c2ba7 (Thu Mar 25 05:47:29 UTC 2021) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] fsk119 opened a new pull request #15366: Sql file
fsk119 opened a new pull request #15366: URL: https://github.com/apache/flink/pull/15366 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] sunxiaoguang opened a new pull request #15365: [FLINK-21108][rest-client] Add basic & digest auth support to rest client
sunxiaoguang opened a new pull request #15365: URL: https://github.com/apache/flink/pull/15365 Signed-off-by: Xiaoguang Sun ## What is the purpose of the change Implement Basic & Digest Auth scheme in rest-client so Flink rest-server can be protected behind a proxy with corresponding authentication enabled. ## Brief change log - *Restart request and resend request again with corresponding authorization responses on receiving authentication challenge.* - *Abstract different authentication scheme and implemented two of the most widely used authentication schemes which are Basic and Digest* - *Add a special exception type RestartableException to Flink runtime concurrent package so that a task that can be restarted immediately can be handled differently in FututeUtils.java* - *Add two new config options to rest client to specify username and password* ## Verifying this change This change added tests and can be verified as follows: - *TODO* ## Does this pull request potentially affect one of the following parts: NA ## Documentation - Flink rest client can be verified against a secured proxy sitting in front of real Job Manager and makes Job Manager available to users have valid credential only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15148: [FLINK-21731] Add benchmarks for DefaultScheduler's creation, scheduling and deploying
flinkbot edited a comment on pull request #15148: URL: https://github.com/apache/flink/pull/15148#issuecomment-796572621 ## CI report: * cd1977e701621bdd1f827dce3f0b463b100958c0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15335) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15378) * 9e6f3acd643ac1e479af96b8073c3c0d72952d75 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15419) * 515a4b2e72d8d0a37d208dfddca0bf89cc642b78 UNKNOWN * 4be6c3acfcdabd369d386fb740817bdfe5e7056d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15423) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun-playground] asfgit closed pull request #7: [FLINK-21932] Add Python SDK showcase
asfgit closed pull request #7: URL: https://github.com/apache/flink-statefun-playground/pull/7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wsry commented on a change in pull request #13924: [FLINK-19938][network] Implement shuffle data read scheduling for sort-merge blocking shuffle
wsry commented on a change in pull request #13924: URL: https://github.com/apache/flink/pull/13924#discussion_r601060222 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java ## @@ -176,21 +202,28 @@ public void resumeConsumption() { @Override public Throwable getFailureCause() { -// we can never throw an error after this was created -return null; +synchronized (lock) { +return failureCause; +} } @Override public boolean isAvailable(int numCreditsAvailable) { -if (numCreditsAvailable > 0) { -return !buffersRead.isEmpty(); -} +synchronized (lock) { +if (isReleased) { +return true; +} -return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer(); +if (numCreditsAvailable > 0) { +return !buffersRead.isEmpty(); +} + +return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer(); +} } @Override public int unsynchronizedGetNumberOfQueuedBuffers() { -return 0; +return buffersRead.size(); Review comment: Because the method has unsynchronized prefix, I think the reason is to avoid getting metrics influence the performance. The PipelinedSubpartition does similar things. Maybe we should also use Math.max(buffersRead.size(), 0) like the PipelinedSubpartition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wsry commented on a change in pull request #13924: [FLINK-19938][network] Implement shuffle data read scheduling for sort-merge blocking shuffle
wsry commented on a change in pull request #13924: URL: https://github.com/apache/flink/pull/13924#discussion_r601057131 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java ## @@ -176,21 +202,28 @@ public void resumeConsumption() { @Override public Throwable getFailureCause() { -// we can never throw an error after this was created -return null; +synchronized (lock) { +return failureCause; +} } @Override public boolean isAvailable(int numCreditsAvailable) { -if (numCreditsAvailable > 0) { -return !buffersRead.isEmpty(); -} +synchronized (lock) { +if (isReleased) { +return true; +} -return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer(); +if (numCreditsAvailable > 0) { +return !buffersRead.isEmpty(); +} + +return !buffersRead.isEmpty() && !buffersRead.peek().isBuffer(); +} } @Override public int unsynchronizedGetNumberOfQueuedBuffers() { Review comment: This method is only for metric. I agree we should add some java doc to methods of ResultSubpartitionView. But that is out of the scope of this PR. Maybe we can do the improvement in another PR in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wsry commented on a change in pull request #13924: [FLINK-19938][network] Implement shuffle data read scheduling for sort-merge blocking shuffle
wsry commented on a change in pull request #13924: URL: https://github.com/apache/flink/pull/13924#discussion_r601053647 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java ## @@ -61,104 +56,69 @@ /** Next data region to be read. */ private int nextRegionToRead; +/** Next file offset to be read. */ +private long nextOffsetToRead; + /** Number of remaining buffers in the current data region read. */ private int currentRegionRemainingBuffers; -/** Whether this partitioned file reader is closed. */ -private boolean isClosed; - -public PartitionedFileReader(PartitionedFile partitionedFile, int targetSubpartition) +public PartitionedFileReader( +PartitionedFile partitionedFile, +int targetSubpartition, +FileChannel dataFileChannel, +FileChannel indexFileChannel) throws IOException { +checkArgument( +dataFileChannel.isOpen() && indexFileChannel.isOpen(), +"Both data file channel and index file channel must be opened."); + this.partitionedFile = checkNotNull(partitionedFile); this.targetSubpartition = targetSubpartition; +this.dataFileChannel = checkNotNull(dataFileChannel); +this.indexFileChannel = checkNotNull(indexFileChannel); this.indexEntryBuf = ByteBuffer.allocateDirect(PartitionedFile.INDEX_ENTRY_SIZE); BufferReaderWriterUtil.configureByteBuffer(indexEntryBuf); - -this.dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); -try { -this.indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); -} catch (Throwable throwable) { -IOUtils.closeQuietly(dataFileChannel); -throw throwable; -} } -private FileChannel openFileChannel(Path path) throws IOException { -return FileChannel.open(path, StandardOpenOption.READ); -} - -private boolean moveToNextReadableRegion() throws IOException { -if (currentRegionRemainingBuffers > 0) { -return true; -} - -while (nextRegionToRead < partitionedFile.getNumRegions()) { +private void moveToNextReadableRegion() throws IOException { +while (currentRegionRemainingBuffers <= 0 +&& nextRegionToRead < partitionedFile.getNumRegions()) { partitionedFile.getIndexEntry( indexFileChannel, indexEntryBuf, nextRegionToRead, targetSubpartition); -long dataOffset = indexEntryBuf.getLong(); +nextOffsetToRead = indexEntryBuf.getLong(); currentRegionRemainingBuffers = indexEntryBuf.getInt(); ++nextRegionToRead; - -if (currentRegionRemainingBuffers > 0) { -dataFileChannel.position(dataOffset); -return true; -} } - -return false; } /** - * Reads a buffer from the {@link PartitionedFile} and moves the read position forward. + * Reads a buffer from the current region of the target {@link PartitionedFile} and moves the + * read position forward. * * Note: The caller is responsible for recycling the target buffer if any exception occurs. */ @Nullable -public Buffer readBuffer(MemorySegment target, BufferRecycler recycler) throws IOException { -checkState(!isClosed, "File reader is already closed."); - -if (moveToNextReadableRegion()) { ---currentRegionRemainingBuffers; -return readFromByteChannel(dataFileChannel, headerBuf, target, recycler); +public Buffer readCurrentRegion(MemorySegment target, BufferRecycler recycler) +throws IOException { +if (currentRegionRemainingBuffers == 0) { Review comment: As the method name indicates, this method only read data from current region. I will make the methods package private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15314: [FLINK-21332][runtime] Optimize releasing result partitions in RegionPartitionReleaseStrategy
flinkbot edited a comment on pull request #15314: URL: https://github.com/apache/flink/pull/15314#issuecomment-803903444 ## CI report: * 7512eec2a3d84fb8211cf9b1161a25b7e8b8800e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15380) * 2dc45dbffe9740c0af706ff2c0ce6d241620be9e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15422) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun-playground] tzulitai commented on a change in pull request #7: [FLINK-21932] Add Python SDK showcase
tzulitai commented on a change in pull request #7: URL: https://github.com/apache/flink-statefun-playground/pull/7#discussion_r601044648 ## File path: python/showcase/README.md ## @@ -0,0 +1,264 @@ +# StateFun Python SDK Showcase + +This project is intended for new StateFun users that would like to start implementing their StateFun application functions using Python. +The tutorial is streamlined and split into a few parts which we recommend to go through a specific order, as lay out below. +Each part is demonstrated with some code snippets plus comments to guide you through the SDK fundamentals. + +## Prerequisites + +- python3 +- pip +- docker +- docker-compose + +## Building the example + +### Using venv + +``` +python3 -m venv venv +source venv/bin/activate +pip3 install . +``` + +## Tutorial Sections + +The [__main__.py](showcase/__main__.py) file demonstrates SDK concepts at length, and highly recommend +to read trough it. The sections below are copied from that file with some of the comments removed. Review comment: type: `trough` -> `through` ## File path: python/showcase/README.md ## @@ -0,0 +1,264 @@ +# StateFun Python SDK Showcase + +This project is intended for new StateFun users that would like to start implementing their StateFun application functions using Python. +The tutorial is streamlined and split into a few parts which we recommend to go through a specific order, as lay out below. +Each part is demonstrated with some code snippets plus comments to guide you through the SDK fundamentals. + +## Prerequisites + +- python3 +- pip +- docker +- docker-compose + +## Building the example + +### Using venv + +``` +python3 -m venv venv +source venv/bin/activate +pip3 install . +``` + +## Tutorial Sections + +The [__main__.py](showcase/__main__.py) file demonstrates SDK concepts at length, and highly recommend Review comment: ```suggestion The [__main__.py](showcase/__main__.py) file demonstrates SDK concepts at length, and it is highly recommended ``` ## File path: python/showcase/showcase/__main__.py ## @@ -0,0 +1,289 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import asyncio + +from datetime import timedelta + +from aiohttp import web +from statefun import * + +from .showcase_custom_types_pb2 import UserProfile +from .showcase_custom_types import GREET_JSON_TYPE, USER_PROFILE_PROTOBUF_TYPE + +functions = StatefulFunctions() + + +# Showcase Part 1: Type System +# +# This function demonstrates StateFun's type system using the Python SDK. +# +# Core Type abstraction +# = +# The core abstraction used by StateFun's type system is the Type interface, which +# consists of a few things that StateFun uses to handle messages and state values: +# +# A TypeName to identify the type. +# A TypeSerializer for serializing and deserializing instances of the type. +# +# Cross-language primitive types +# == +# StateFun's type system has cross-language support for common primitive types, such as boolean, +# integer, long, etc. These primitive types have built-in Types implemented for them +# already, with predefined typenames. +# +# This is of course all transparent for the user, so you don't need to worry about it. Functions +# implemented in various languages (e.g. Java or Python) can message each other by directly sending +# supported primitive values as message arguments. Moreover, the type system is used for state +# values as well; so, you can expect that a function can safely read previous state after +# reimplementing it in a different language. We'll cover more on state storage access in later +# parts of the showcase series. +# +# Common custom types (e.g. JSON or Protobuf) +# === +# The type system is also very easily extensible to support custom message types, such as JSON +# or Protobuf messages. This is just a matter of implementing your own Type
[GitHub] [flink-statefun] tzulitai commented on pull request #218: [FLINK-21955]-[FLINK-21960] Pre-release sweep.
tzulitai commented on pull request #218: URL: https://github.com/apache/flink-statefun/pull/218#issuecomment-806366859 LGTM from our offline hackathon :) Merged this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai commented on a change in pull request #218: [FLINK-21955]-[FLINK-21960] Pre-release sweep.
tzulitai commented on a change in pull request #218: URL: https://github.com/apache/flink-statefun/pull/218#discussion_r601043830 ## File path: statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java ## @@ -58,4 +60,17 @@ public Message deserialize(ConsumerRecord input) { .setPayloadBytes(MoreByteStrings.wrap(payload)) .build(); } + + private byte[] requireNonNullKey(byte[] key) { Review comment: I've added this while merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai merged pull request #218: [FLINK-21955]-[FLINK-21960] Pre-release sweep.
tzulitai merged pull request #218: URL: https://github.com/apache/flink-statefun/pull/218 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] tzulitai closed pull request #218: [FLINK-21955]-[FLINK-21960] Pre-release sweep.
tzulitai closed pull request #218: URL: https://github.com/apache/flink-statefun/pull/218 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #15265: [FLINK-21836][table-api] Introduce ParseStrategyParser
wuchong commented on a change in pull request #15265: URL: https://github.com/apache/flink/pull/15265#discussion_r601036827 ## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/ParserImplTest.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema; +import org.apache.flink.table.utils.CatalogManagerMocks; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; + +import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +/** Test for {@link ParserImpl}. */ +public class ParserImplTest { + +@Rule public ExpectedException thrown = ExpectedException.none(); + +private final boolean isStreamingMode = false; +private final TableConfig tableConfig = new TableConfig(); +private final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default"); +private final CatalogManager catalogManager = + CatalogManagerMocks.preparedCatalogManager().defaultCatalog("builtin", catalog).build(); +private final ModuleManager moduleManager = new ModuleManager(); +private final FunctionCatalog functionCatalog = +new FunctionCatalog(tableConfig, catalogManager, moduleManager); +private final PlannerContext plannerContext = +new PlannerContext( +tableConfig, +functionCatalog, +catalogManager, +asRootSchema(new CatalogManagerCalciteSchema(catalogManager, isStreamingMode)), +new ArrayList<>()); + +private final Supplier plannerSupplier = +() -> +plannerContext.createFlinkPlanner( +catalogManager.getCurrentCatalog(), +catalogManager.getCurrentDatabase()); + +private final Parser parser = +new ParserImpl( +catalogManager, +plannerSupplier, +() -> plannerSupplier.get().parser(), +t -> +plannerContext.createSqlExprToRexConverter( + plannerContext.getTypeFactory().buildRelNodeRowType(t))); + +private List testLegalStatements; +private List testIllegalStatements; + +@Before +public void setup() { +testLegalStatements = +Arrays.asList( + TestSpec.forStatement("ClEaR").expectedSummary("CLEAR"), +TestSpec.forStatement("hElP").expectedSummary("HELP"), +TestSpec.forStatement("qUIT").expectedSummary("QUIT"), +TestSpec.forStatement("ExIT").expectedSummary("EXIT"), + TestSpec.forStatement("REsEt").expectedSummary("RESET"), +TestSpec.forStatement(" SEt ").expectedSummary("SET"), +TestSpec.forStatement("SET execution.runtime-type=batch") +.expectedSummary("SET execution.runtime-type=batch"), +TestSpec.forStatement("SET pipeline.jars = /path/to/test-_-jar.jar") +.expectedSummary("SET
[GitHub] [flink] flinkbot edited a comment on pull request #15314: [FLINK-21332][runtime] Optimize releasing result partitions in RegionPartitionReleaseStrategy
flinkbot edited a comment on pull request #15314: URL: https://github.com/apache/flink/pull/15314#issuecomment-803903444 ## CI report: * 7512eec2a3d84fb8211cf9b1161a25b7e8b8800e Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15380) * 2dc45dbffe9740c0af706ff2c0ce6d241620be9e UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15148: [FLINK-21731] Add benchmarks for DefaultScheduler's creation, scheduling and deploying
flinkbot edited a comment on pull request #15148: URL: https://github.com/apache/flink/pull/15148#issuecomment-796572621 ## CI report: * cd1977e701621bdd1f827dce3f0b463b100958c0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15335) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15378) * 9e6f3acd643ac1e479af96b8073c3c0d72952d75 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15419) * 515a4b2e72d8d0a37d208dfddca0bf89cc642b78 UNKNOWN * 4be6c3acfcdabd369d386fb740817bdfe5e7056d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a change in pull request #15303: [FLINK-21713][table-api/table-planner] Correct function CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW/LOCALTIME/LOCALTIMEST
leonardBang commented on a change in pull request #15303: URL: https://github.com/apache/flink/pull/15303#discussion_r601035897 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala ## @@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions import java.sql.Time import java.time.format.DateTimeFormatter -import java.time.{LocalDateTime, ZoneId} +import java.time.{LocalDate, LocalDateTime, ZoneId} +import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.ExecutionOptions import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase import org.apache.flink.types.Row +import org.junit.Assert.assertEquals import org.junit.Test +import scala.collection.mutable + /** * Tests that check all non-deterministic functions can be executed. */ class NonDeterministicTests extends ExpressionTestBase { @Test - def testCurrentDate(): Unit = { + def testCurrentDateTime(): Unit = { testAllApis( currentDate().isGreater("1970-01-01".toDate), - "currentDate() > '1970-01-01'.toDate", "CURRENT_DATE > DATE '1970-01-01'", "true") - } - @Test - def testCurrentTime(): Unit = { testAllApis( currentTime().isGreaterOrEqual("00:00:00".toTime), - "currentTime() >= '00:00:00'.toTime", "CURRENT_TIME >= TIME '00:00:00'", "true") + +testAllApis( + currentTimestamp().isGreater( +"1970-01-01 00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())), + s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}", + "true") + +testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}", + "true") } @Test - def testCurrentTimestamp(): Unit = { -testAllApis( - currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp), - "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp", - "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'", + def testCurrentDateTimeInStreamMode(): Unit = { +val temporalFunctions = getCodeGenFunctions(List( + "CURRENT_DATE", + "CURRENT_TIME", + "CURRENT_TIMESTAMP", + "NOW()", + "LOCALTIME", + "LOCALTIMESTAMP")) +val round1 = evaluateFunctionResult(temporalFunctions) +Thread.sleep(1 * 1000L) +val round2: List[String] = evaluateFunctionResult(temporalFunctions) + +assertEquals(round1.size, round2.size) +round1.zip(round2).zipWithIndex.foreach(r => { + // CURRENT_DATE may be same between two records + if (r._2 == 0) { +assert(r._1._1 <= r._1._2) + } else { +assert(r._1._1 < r._1._2) + } +}) + } + + @Test + def testCurrentDateTimeInBatchMode(): Unit = { +config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH) +config.getConfiguration.setLong("__table.query-start.epoch-time__", 1000L) +config.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) +val temporalFunctions = getCodeGenFunctions(List( + "CURRENT_DATE", + "CURRENT_TIME", + "CURRENT_TIMESTAMP", + "NOW()", + "LOCALTIME", + "LOCALTIMESTAMP")) + +val expected = mutable.MutableList[String]( + "1970-01-01", + "00:00:01", + "1970-01-01 08:00:01", + "1970-01-01 08:00:01", + "08:00:01", + "1970-01-01 08:00:01") + +val result1 = evaluateFunctionResult(temporalFunctions) +assertEquals(expected.toList.sorted, result1.sorted) + +Thread.sleep(1 * 1000L) +val result2: List[String] = evaluateFunctionResult(temporalFunctions) +assertEquals(expected.toList.sorted, result2.sorted) Review comment: we need to check the batch mode always return same value in one query for different record -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a change in pull request #15303: [FLINK-21713][table-api/table-planner] Correct function CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW/LOCALTIME/LOCALTIMEST
leonardBang commented on a change in pull request #15303: URL: https://github.com/apache/flink/pull/15303#discussion_r601035545 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/NonDeterministicTests.scala ## @@ -20,52 +20,164 @@ package org.apache.flink.table.planner.expressions import java.sql.Time import java.time.format.DateTimeFormatter -import java.time.{LocalDateTime, ZoneId} +import java.time.{LocalDate, LocalDateTime, ZoneId} +import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.ExecutionOptions import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.expressions.utils.ExpressionTestBase import org.apache.flink.types.Row +import org.junit.Assert.assertEquals import org.junit.Test +import scala.collection.mutable + /** * Tests that check all non-deterministic functions can be executed. */ class NonDeterministicTests extends ExpressionTestBase { @Test - def testCurrentDate(): Unit = { + def testCurrentDateTime(): Unit = { testAllApis( currentDate().isGreater("1970-01-01".toDate), - "currentDate() > '1970-01-01'.toDate", "CURRENT_DATE > DATE '1970-01-01'", "true") - } - @Test - def testCurrentTime(): Unit = { testAllApis( currentTime().isGreaterOrEqual("00:00:00".toTime), - "currentTime() >= '00:00:00'.toTime", "CURRENT_TIME >= TIME '00:00:00'", "true") + +testAllApis( + currentTimestamp().isGreater( +"1970-01-01 00:00:00".cast(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())), + s"CURRENT_TIMESTAMP > ${timestampLtz("1970-01-01 00:00:00")}", + "true") + +testSqlApi(s"NOW() > ${timestampLtz("1970-01-01 00:00:00")}", + "true") } @Test - def testCurrentTimestamp(): Unit = { -testAllApis( - currentTimestamp().isGreater("1970-01-01 00:00:00".toTimestamp), - "currentTimestamp() > '1970-01-01 00:00:00'.toTimestamp", - "CURRENT_TIMESTAMP > TIMESTAMP '1970-01-01 00:00:00'", + def testCurrentDateTimeInStreamMode(): Unit = { +val temporalFunctions = getCodeGenFunctions(List( + "CURRENT_DATE", + "CURRENT_TIME", + "CURRENT_TIMESTAMP", + "NOW()", + "LOCALTIME", + "LOCALTIMESTAMP")) +val round1 = evaluateFunctionResult(temporalFunctions) +Thread.sleep(1 * 1000L) +val round2: List[String] = evaluateFunctionResult(temporalFunctions) + +assertEquals(round1.size, round2.size) +round1.zip(round2).zipWithIndex.foreach(r => { + // CURRENT_DATE may be same between two records + if (r._2 == 0) { +assert(r._1._1 <= r._1._2) + } else { +assert(r._1._1 < r._1._2) + } +}) + } + + @Test + def testCurrentDateTimeInBatchMode(): Unit = { +config.getConfiguration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH) +config.getConfiguration.setLong("__table.query-start.epoch-time__", 1000L) +config.setLocalTimeZone(ZoneId.of("Asia/Shanghai")) +val temporalFunctions = getCodeGenFunctions(List( + "CURRENT_DATE", + "CURRENT_TIME", + "CURRENT_TIMESTAMP", + "NOW()", + "LOCALTIME", + "LOCALTIMESTAMP")) + +val expected = mutable.MutableList[String]( + "1970-01-01", + "00:00:01", + "1970-01-01 08:00:01", + "1970-01-01 08:00:01", + "08:00:01", + "1970-01-01 08:00:01") + +val result1 = evaluateFunctionResult(temporalFunctions) +assertEquals(expected.toList.sorted, result1.sorted) + +Thread.sleep(1 * 1000L) +val result2: List[String] = evaluateFunctionResult(temporalFunctions) +assertEquals(expected.toList.sorted, result2.sorted) + } + + @Test + def testCurrentTimestampInUTC(): Unit = { +config.setLocalTimeZone(ZoneId.of("UTC")) +val localDateTime = LocalDateTime.now(ZoneId.of("UTC")) + +val formattedCurrentDate = localDateTime + .format(DateTimeFormatter.ofPattern("-MM-dd")) +val formattedCurrentTime = localDateTime + .toLocalTime + .format(DateTimeFormatter.ofPattern("HH:mm:ss")) +val formattedCurrentTimestamp = localDateTime + .format(DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss")) + +// the CURRENT_DATE/CURRENT_TIME/CURRENT_TIMESTAMP/NOW() functions are +// not deterministic, thus we +// use following pattern to check the returned SQL timestamp in session time zone UTC +testSqlApi( + s"DATE_SUB(CURRENT_DATE, DATE '$formattedCurrentDate') = 0", + "true") + +testSqlApi( + s"TIME_SUB(CURRENT_TIME, TIME '$formattedCurrentTime') <= 6", + "true") + +testSqlApi( + s"${timestampLtz(formattedCurrentTimestamp)} < CURRENT_TIMESTAMP", + "true") + +testSqlApi( + s"${timestampLtz(formattedCurrentTimestamp)} < NOW()", Review
[GitHub] [flink] zhuzhurk commented on a change in pull request #15312: [FLINK-21331][runtime] Optimize calculating tasks to restart in RestartPipelinedRegionFailoverStrategy
zhuzhurk commented on a change in pull request #15312: URL: https://github.com/apache/flink/pull/15312#discussion_r601034351 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java ## @@ -176,15 +183,24 @@ public RestartPipelinedRegionFailoverStrategy( // if a needed input result partition is not available, its producer region is involved for (SchedulingExecutionVertex vertex : regionToRestart.getVertices()) { -for (SchedulingResultPartition consumedPartition : vertex.getConsumedResults()) { -if (!resultPartitionAvailabilityChecker.isAvailable( -consumedPartition.getId())) { -SchedulingPipelinedRegion producerRegion = -topology.getPipelinedRegionOfVertex( - consumedPartition.getProducer().getId()); -if (!visitedRegions.contains(producerRegion)) { -visitedRegions.add(producerRegion); -regionsToVisit.add(producerRegion); +for (ConsumedPartitionGroup consumedPartitionGroup : +vertex.getConsumedPartitionGroups()) { +if (!visitedConsumedResultGroups.contains(consumedPartitionGroup)) { + visitedConsumedResultGroups.add(consumedPartitionGroup); +for (IntermediateResultPartitionID consumedPartitionId : +consumedPartitionGroup) { +if (!resultPartitionAvailabilityChecker.isAvailable( Review comment: maybe extract the finding of unvisited partitions out to avoid such deep nested loops? e.g. ``` private Iterable getUnvisitedConsumedPartitions( SchedulingPipelinedRegion region, Set visitedConsumedResultGroups) { final List unvisitedConsumedPartitionGroups = new ArrayList<>(); for (SchedulingExecutionVertex vertex : region.getVertices()) { for (ConsumedPartitionGroup consumedPartitionGroup : vertex.getConsumedPartitionGroups()) { if (!visitedConsumedResultGroups.contains(consumedPartitionGroup)) { continue; } visitedConsumedResultGroups.add(consumedPartitionGroup); unvisitedConsumedPartitionGroups.add(consumedPartitionGroup); } } return () -> IterableUtils.flatMap(unvisitedConsumedPartitionGroups, Function.identity()); } ``` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java ## @@ -193,13 +209,18 @@ public RestartPipelinedRegionFailoverStrategy( // all consumer regions of an involved region should be involved for (SchedulingExecutionVertex vertex : regionToRestart.getVertices()) { Review comment: Similar to above comment, we can introduce a `getUnvisitedConsumerVertices`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-21930) There are some error for hive_read_writer.md
[ https://issues.apache.org/jira/browse/FLINK-21930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-21930. --- Fix Version/s: 1.13.0 Assignee: hehuiyuan Resolution: Fixed Fixed in master: 9bcd56c5ad4220a1008301377715f14c5ef83064 > There are some error for hive_read_writer.md > > > Key: FLINK-21930 > URL: https://issues.apache.org/jira/browse/FLINK-21930 > Project: Flink > Issue Type: Wish > Components: Documentation >Reporter: hehuiyuan >Assignee: hehuiyuan >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > !https://user-images.githubusercontent.com/18002496/111795099-db2c0580-8901-11eb-929d-dd40c179a948.png|width=406,height=194! > > > h1. fix hive dim doc replace order to o -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21930) Fix typo in Hive temporal join example
[ https://issues.apache.org/jira/browse/FLINK-21930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-21930: Summary: Fix typo in Hive temporal join example (was: There are some error for hive_read_writer.md) > Fix typo in Hive temporal join example > -- > > Key: FLINK-21930 > URL: https://issues.apache.org/jira/browse/FLINK-21930 > Project: Flink > Issue Type: Wish > Components: Documentation, Table SQL / Ecosystem >Reporter: hehuiyuan >Assignee: hehuiyuan >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > !https://user-images.githubusercontent.com/18002496/111795099-db2c0580-8901-11eb-929d-dd40c179a948.png|width=406,height=194! > > > h1. fix hive dim doc replace order to o -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21930) There are some error for hive_read_writer.md
[ https://issues.apache.org/jira/browse/FLINK-21930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-21930: Component/s: Table SQL / Ecosystem > There are some error for hive_read_writer.md > > > Key: FLINK-21930 > URL: https://issues.apache.org/jira/browse/FLINK-21930 > Project: Flink > Issue Type: Wish > Components: Documentation, Table SQL / Ecosystem >Reporter: hehuiyuan >Assignee: hehuiyuan >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > !https://user-images.githubusercontent.com/18002496/111795099-db2c0580-8901-11eb-929d-dd40c179a948.png|width=406,height=194! > > > h1. fix hive dim doc replace order to o -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong merged pull request #15292: [FLINK-21930][DOC]fix hive dim doc replace o to order
wuchong merged pull request #15292: URL: https://github.com/apache/flink/pull/15292 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
wuchong commented on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-806354798 I think we can simplify the desgin to just allow users change default value for types instead of columns. HBase connector also has a similar option `null-string-literal`. https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/hbase/#null-string-literal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15348: [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode
flinkbot edited a comment on pull request #15348: URL: https://github.com/apache/flink/pull/15348#issuecomment-805219989 ## CI report: * 548ddea0488e0df73a8fd37234404fb25001c598 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15413) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15148: [FLINK-21731] Add benchmarks for DefaultScheduler's creation, scheduling and deploying
flinkbot edited a comment on pull request #15148: URL: https://github.com/apache/flink/pull/15148#issuecomment-796572621 ## CI report: * cd1977e701621bdd1f827dce3f0b463b100958c0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15335) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15378) * 9e6f3acd643ac1e479af96b8073c3c0d72952d75 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15419) * 515a4b2e72d8d0a37d208dfddca0bf89cc642b78 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] Thesharing commented on a change in pull request #15148: [FLINK-21731] Add benchmarks for DefaultScheduler's creation, scheduling and deploying
Thesharing commented on a change in pull request #15148: URL: https://github.com/apache/flink/pull/15148#discussion_r601026918 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerBenchmarkBase.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.benchmark.e2e; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl; +import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; +import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils.createDefaultJobVertices; +import static org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils.createJobGraph; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * The base class of benchmarks related to {@link DefaultScheduler}'s creation, scheduling and + * deploying. + */ +public class SchedulerBenchmarkBase { + +ScheduledExecutorService scheduledExecutorService; +ComponentMainThreadExecutor mainThreadExecutor; + +JobGraph jobGraph; +PhysicalSlotProvider physicalSlotProvider; + +public void setup(JobConfiguration jobConfiguration) throws Exception { +scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); +mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( +scheduledExecutorService); + +final List jobVertices = createDefaultJobVertices(jobConfiguration); +jobGraph = createJobGraph(jobVertices, jobConfiguration); + +physicalSlotProvider = +createPhysicalSlotProvider( +jobConfiguration, jobVertices.size(), mainThreadExecutor); +} + +public void teardown() throws Exception { +if (scheduledExecutorService != null) { +scheduledExecutorService.shutdownNow(); +} +} + +static PhysicalSlotProvider createPhysicalSlotProvider( +JobConfiguration jobConfiguration, +int numberOfJobVertices, +ComponentMainThreadExecutor mainThreadExecutor) +throws Exception { +final int slotPoolSize = jobConfiguration.getParallelism() * numberOfJobVertices; + +final SlotPoolImpl slotPool = new SlotPoolBuilder(mainThreadExecutor).build(); +final TestingTaskExecutorGateway testingTaskExecutorGateway = +new
[GitHub] [flink] zhuzhurk commented on a change in pull request #15148: [FLINK-21731] Add benchmarks for DefaultScheduler's creation, scheduling and deploying
zhuzhurk commented on a change in pull request #15148: URL: https://github.com/apache/flink/pull/15148#discussion_r601026373 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerBenchmarkBase.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.benchmark.e2e; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway; +import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl; +import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; +import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils.createDefaultJobVertices; +import static org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils.createJobGraph; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +/** + * The base class of benchmarks related to {@link DefaultScheduler}'s creation, scheduling and + * deploying. + */ +public class SchedulerBenchmarkBase { + +ScheduledExecutorService scheduledExecutorService; +ComponentMainThreadExecutor mainThreadExecutor; + +JobGraph jobGraph; +PhysicalSlotProvider physicalSlotProvider; + +public void setup(JobConfiguration jobConfiguration) throws Exception { +scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); +mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( +scheduledExecutorService); + +final List jobVertices = createDefaultJobVertices(jobConfiguration); +jobGraph = createJobGraph(jobVertices, jobConfiguration); + +physicalSlotProvider = +createPhysicalSlotProvider( +jobConfiguration, jobVertices.size(), mainThreadExecutor); +} + +public void teardown() throws Exception { +if (scheduledExecutorService != null) { +scheduledExecutorService.shutdownNow(); +} +} + +static PhysicalSlotProvider createPhysicalSlotProvider( Review comment: can be private ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerBenchmarkBase.java ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache
[GitHub] [flink] KurtYoung commented on a change in pull request #15356: [FLINK-21947][csv] Support TIMESTAMP_LTZ type in CSV format
KurtYoung commented on a change in pull request #15356: URL: https://github.com/apache/flink/pull/15356#discussion_r601006233 ## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java ## @@ -316,7 +321,11 @@ private static RuntimeConverter createRuntimeConverter( } else if (info.equals(Types.LOCAL_TIME)) { return (node) -> Time.valueOf(node.asText()).toLocalTime(); } else if (info.equals(Types.LOCAL_DATE_TIME)) { -return (node) -> Timestamp.valueOf(node.asText()).toLocalDateTime(); +return (node) -> LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_FORMAT); +} else if (info.equals(Types.INSTANT)) { +return (node) -> +LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT) +.toInstant(ZoneOffset.UTC); Review comment: I'm not sure we can use UTC here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-21968) Calcite parser doesn't support to parse "-- test"
[ https://issues.apache.org/jira/browse/FLINK-21968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang updated FLINK-21968: -- Affects Version/s: 1.13.0 > Calcite parser doesn't support to parse "-- test" > - > > Key: FLINK-21968 > URL: https://issues.apache.org/jira/browse/FLINK-21968 > Project: Flink > Issue Type: Bug >Affects Versions: 1.13.0 >Reporter: Shengkai Fang >Priority: Major > > Please add a test in the {{FlinkSqlParserImplTest.java}}. > {code:java} > @Test > public void testParseComment() { > sql("-- test").ok("-- test"); > } > {code} > I think we can fix this by using {{ExtendedParseStrategy}} to clear the > string that matches "--.*$"; > {{ExtendedParseStrategy}} is introduced in the FLINK-21836 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] fsk119 commented on pull request #15265: [FLINK-21836][table-api] Introduce ParseStrategyParser
fsk119 commented on pull request #15265: URL: https://github.com/apache/flink/pull/15265#issuecomment-806343421 I think we can rebase this PR until all concerns are resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-21968) Calcite parser doesn't support to parse "-- test"
[ https://issues.apache.org/jira/browse/FLINK-21968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang updated FLINK-21968: -- Description: Please add a test in the {{FlinkSqlParserImplTest.java}}. {code:java} @Test public void testParseComment() { sql("-- test").ok("-- test"); } {code} I think we can fix this by using {{ExtendedParseStrategy}} to clear the string that matches "--.*$"; {{ExtendedParseStrategy}} is introduced in the FLINK-21836 was: Please add a test in the {{FlinkSqlParserImplTest.java}}. {code:java} @Test public void testParseComment() { sql("-- test").ok("-- test"); } {code} I think we can fix this by using regex to clear the string that matches "--.*$"; > Calcite parser doesn't support to parse "-- test" > - > > Key: FLINK-21968 > URL: https://issues.apache.org/jira/browse/FLINK-21968 > Project: Flink > Issue Type: Bug >Reporter: Shengkai Fang >Priority: Major > > Please add a test in the {{FlinkSqlParserImplTest.java}}. > {code:java} > @Test > public void testParseComment() { > sql("-- test").ok("-- test"); > } > {code} > I think we can fix this by using {{ExtendedParseStrategy}} to clear the > string that matches "--.*$"; > {{ExtendedParseStrategy}} is introduced in the FLINK-21836 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21788) Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
[ https://issues.apache.org/jira/browse/FLINK-21788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-21788: Priority: Critical (was: Blocker) > Throw PartitionNotFoundException if the partition file has been lost for > blocking shuffle > - > > Key: FLINK-21788 > URL: https://issues.apache.org/jira/browse/FLINK-21788 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Critical > Labels: pull-request-available > Fix For: 1.13.0 > > > Currently, if the partition file has been lost for blocking shuffle, > FileNotFoundException will be thrown and the partition data is not > regenerated, so failover can not recover the job. It should throw > PartitionNotFoundException instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15259: [FLINK-20757][network] Optimize data broadcast for sort-merge blocking shuffle
flinkbot edited a comment on pull request #15259: URL: https://github.com/apache/flink/pull/15259#issuecomment-801679503 ## CI report: * b1b7558454e1df9f2ae913f6363cd823e1877c4c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15367) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15420) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-21788) Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
[ https://issues.apache.org/jira/browse/FLINK-21788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-21788: Priority: Blocker (was: Major) > Throw PartitionNotFoundException if the partition file has been lost for > blocking shuffle > - > > Key: FLINK-21788 > URL: https://issues.apache.org/jira/browse/FLINK-21788 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.13.0 > > > Currently, if the partition file has been lost for blocking shuffle, > FileNotFoundException will be thrown and the partition data is not > regenerated, so failover can not recover the job. It should throw > PartitionNotFoundException instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21968) Calcite parser doesn't support to parse "-- test"
Shengkai Fang created FLINK-21968: - Summary: Calcite parser doesn't support to parse "-- test" Key: FLINK-21968 URL: https://issues.apache.org/jira/browse/FLINK-21968 Project: Flink Issue Type: Bug Reporter: Shengkai Fang Please add a test in the {{FlinkSqlParserImplTest.java}}. {code:java} @Test public void testParseComment() { sql("-- test").ok("-- test"); } {code} I think we can fix this by using regex to clear the string that matches "--.*$"; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21788) Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
[ https://issues.apache.org/jira/browse/FLINK-21788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-21788: Parent: (was: FLINK-19614) Issue Type: Bug (was: Sub-task) > Throw PartitionNotFoundException if the partition file has been lost for > blocking shuffle > - > > Key: FLINK-21788 > URL: https://issues.apache.org/jira/browse/FLINK-21788 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > Currently, if the partition file has been lost for blocking shuffle, > FileNotFoundException will be thrown and the partition data is not > regenerated, so failover can not recover the job. It should throw > PartitionNotFoundException instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15148: [FLINK-21731] Add benchmarks for DefaultScheduler's creation, scheduling and deploying
flinkbot edited a comment on pull request #15148: URL: https://github.com/apache/flink/pull/15148#issuecomment-796572621 ## CI report: * cd1977e701621bdd1f827dce3f0b463b100958c0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15335) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15378) * 9e6f3acd643ac1e479af96b8073c3c0d72952d75 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-21788) Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
[ https://issues.apache.org/jira/browse/FLINK-21788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308329#comment-17308329 ] Zhu Zhu commented on FLINK-21788: - which versions are affected by this problem [~kevin.cyj]? > Throw PartitionNotFoundException if the partition file has been lost for > blocking shuffle > - > > Key: FLINK-21788 > URL: https://issues.apache.org/jira/browse/FLINK-21788 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > Currently, if the partition file has been lost for blocking shuffle, > FileNotFoundException will be thrown and the partition data is not > regenerated, so failover can not recover the job. It should throw > PartitionNotFoundException instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-21788) Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
[ https://issues.apache.org/jira/browse/FLINK-21788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308329#comment-17308329 ] Zhu Zhu edited comment on FLINK-21788 at 3/25/21, 3:44 AM: --- Which versions are affected by this problem [~kevin.cyj]? was (Author: zhuzh): which versions are affected by this problem [~kevin.cyj]? > Throw PartitionNotFoundException if the partition file has been lost for > blocking shuffle > - > > Key: FLINK-21788 > URL: https://issues.apache.org/jira/browse/FLINK-21788 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > Currently, if the partition file has been lost for blocking shuffle, > FileNotFoundException will be thrown and the partition data is not > regenerated, so failover can not recover the job. It should throw > PartitionNotFoundException instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-21788) Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
[ https://issues.apache.org/jira/browse/FLINK-21788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu reassigned FLINK-21788: --- Assignee: Yingjie Cao > Throw PartitionNotFoundException if the partition file has been lost for > blocking shuffle > - > > Key: FLINK-21788 > URL: https://issues.apache.org/jira/browse/FLINK-21788 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > Currently, if the partition file has been lost for blocking shuffle, > FileNotFoundException will be thrown and the partition data is not > regenerated, so failover can not recover the job. It should throw > PartitionNotFoundException instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wsry commented on pull request #15259: [FLINK-20757][network] Optimize data broadcast for sort-merge blocking shuffle
wsry commented on pull request #15259: URL: https://github.com/apache/flink/pull/15259#issuecomment-806341129 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-21951) Fix wrong if condition in BufferReaderWriterUtil#writeBuffers
[ https://issues.apache.org/jira/browse/FLINK-21951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu closed FLINK-21951. --- Assignee: Yingjie Cao Resolution: Fixed Fixed via 3533d9822f0f653f2848b31de0fc239b3a12dcef > Fix wrong if condition in BufferReaderWriterUtil#writeBuffers > - > > Key: FLINK-21951 > URL: https://issues.apache.org/jira/browse/FLINK-21951 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > The wrong if condition in BufferReaderWriterUtil#writeBuffers may lead to > data loss when bulk writing a large number of buffers into file. This is a > bug intruded since 1.9, but only small amount of data is written, so the bug > never occurs. In 1.13, the sort-merge shuffle uses it to write more data > which triggers the bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuzhurk merged pull request #15364: [FLINK-21951][network] Fix the wrong if condition in BufferReaderWriterUtil#writeBuffers
zhuzhurk merged pull request #15364: URL: https://github.com/apache/flink/pull/15364 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-21539) 'SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)' fails during download
[ https://issues.apache.org/jira/browse/FLINK-21539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308325#comment-17308325 ] Guowei Ma commented on FLINK-21539: --- https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15411=logs=6caf31d6-847a-526e-9624-468e053467d6=7d4f7375-52df-5ce0-457f-b2ffbb2289a4=6628 > 'SQL Client end-to-end test (Blink planner) Elasticsearch (v6.3.1)' fails > during download > - > > Key: FLINK-21539 > URL: https://issues.apache.org/jira/browse/FLINK-21539 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch, Table SQL / Ecosystem, Tests >Affects Versions: 1.11.3, 1.13.0 >Reporter: Dawid Wysakowicz >Priority: Major > Labels: test-stability > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=13906=logs=91bf6583-3fb2-592f-e4d4-d79d79c3230a=03dbd840-5430-533d-d1a7-05d0ebe03873 > {code} > Downloading Elasticsearch from > https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz > ... > % Total% Received % Xferd Average Speed TimeTime Time > Current > Dload Upload Total SpentLeft Speed > 0 00 00 0 0 0 --:--:-- --:--:-- --:--:-- 0 > 0 87.1M0 9970 0 3298 0 7:42:02 --:--:-- 7:42:02 3290 > 17 87.1M 17 15.6M0 0 11.6M 0 0:00:07 0:00:01 0:00:06 11.6M > 31 87.1M 31 27.8M0 0 12.1M 0 0:00:07 0:00:02 0:00:05 12.1M > 49 87.1M 49 42.9M0 0 12.9M 0 0:00:06 0:00:03 0:00:03 12.9M > 67 87.1M 67 58.9M0 0 13.5M 0 0:00:06 0:00:04 0:00:02 13.5M > 87 87.1M 87 75.8M0 0 14.3M 0 0:00:06 0:00:05 0:00:01 15.1M > 87 87.1M 87 75.9M0 0 14.2M 0 0:00:06 0:00:05 0:00:01 15.1M > curl: (56) GnuTLS recv error (-110): The TLS connection was non-properly > terminated. > % Total% Received % Xferd Average Speed TimeTime Time > Current > Dload Upload Total SpentLeft Speed > 0 00 00 0 0 0 --:--:-- --:--:-- --:--:-- > 0curl: (7) Failed to connect to localhost port 9200: Connection refused > [FAIL] Test script contains errors. > Checking for errors... > No errors in log files. > Checking for exceptions... > No exceptions in log files. > Checking for non-empty .out files... > grep: > /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/log/*.out: > No such file or directory > No non-empty .out files. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #15148: [FLINK-21731] Add benchmarks for DefaultScheduler's creation, scheduling and deploying
zhuzhurk commented on a change in pull request #15148: URL: https://github.com/apache/flink/pull/15148#discussion_r601012694 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulingAndDeployingBenchmark.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.scheduler.benchmark.e2e; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils.createDefaultJobVertices; +import static org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkUtils.createJobGraph; + +/** + * The benchmark of scheduling and deploying tasks in a STREAMING/BATCH job. The related method is + * {@link DefaultScheduler#startScheduling}. + */ +public class SchedulingAndDeployingBenchmark extends SchedulerBenchmarkBase { + +private DefaultScheduler scheduler; + +public void setup(JobConfiguration jobConfiguration) throws Exception { +scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); Review comment: Looks to me that the `mainThreadExecutor`, `jobGraph `, `physicalSlotProvider ` can all be part of the `SchedulerBenchmarkBase`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-21967) Add document on the operation of blocking result partition
Yun Gao created FLINK-21967: --- Summary: Add document on the operation of blocking result partition Key: FLINK-21967 URL: https://issues.apache.org/jira/browse/FLINK-21967 Project: Flink Issue Type: Task Components: Documentation, Runtime / Network Affects Versions: 1.13.0 Reporter: Yun Gao Fix For: 1.13.0 We would need one page to describe the rough behavior, choice and recommend configuration for the blocking result partitions, to help user to better get into the batch jobs and also in case users might met the issue like https://issues.apache.org/jira/browse/FLINK-21416 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15362: [FLINK-21939][python] Support batch mode in Python DataStream API for process operation and reduce operation
flinkbot edited a comment on pull request #15362: URL: https://github.com/apache/flink/pull/15362#issuecomment-805817140 ## CI report: * a46a6236b098fdc7b89755d4b072053134d42507 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15394) * f65f9751b0e4a78d617f3c5a6d483d73580cfe0d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15417) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15161: [FLINK-20114][connector/kafka,common] Fix a few KafkaSource-related bugs
flinkbot edited a comment on pull request #15161: URL: https://github.com/apache/flink/pull/15161#issuecomment-797177953 ## CI report: * d3f59e302b9860d49ce79252aeb8bd1decaeeabf UNKNOWN * 3bbb86b12a43044ce3f876401c98f28c87858fbe Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15366) * c58bb5faec3f07e0b66d0e042e3e9c6082fb8e9c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15416) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KurtYoung commented on a change in pull request #15356: [FLINK-21947][csv] Support TIMESTAMP_LTZ type in CSV format
KurtYoung commented on a change in pull request #15356: URL: https://github.com/apache/flink/pull/15356#discussion_r601004944 ## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/TimeFormats.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; + +/** Time formats and timestamp formats respecting SQL specification. */ +class TimeFormats { Review comment: We shouldn't copy this utility from `flink-json`. How about reuse this in a new module like `flink-format-common`? ## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java ## @@ -316,7 +321,11 @@ private static RuntimeConverter createRuntimeConverter( } else if (info.equals(Types.LOCAL_TIME)) { return (node) -> Time.valueOf(node.asText()).toLocalTime(); } else if (info.equals(Types.LOCAL_DATE_TIME)) { -return (node) -> Timestamp.valueOf(node.asText()).toLocalDateTime(); +return (node) -> LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_FORMAT); +} else if (info.equals(Types.INSTANT)) { +return (node) -> +LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT) +.toInstant(ZoneOffset.UTC); Review comment: I'm not sure we can use UTC here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-21966) Support Kinesis connector in Python DataStream API.
Shuiqiang Chen created FLINK-21966: -- Summary: Support Kinesis connector in Python DataStream API. Key: FLINK-21966 URL: https://issues.apache.org/jira/browse/FLINK-21966 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Shuiqiang Chen Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-20758) Use region file mechanism for shuffle data reading before we switch to managed memory
[ https://issues.apache.org/jira/browse/FLINK-20758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yingjie Cao closed FLINK-20758. --- Resolution: Won't Fix > Use region file mechanism for shuffle data reading before we switch to > managed memory > - > > Key: FLINK-20758 > URL: https://issues.apache.org/jira/browse/FLINK-20758 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Network >Affects Versions: 1.12.0 >Reporter: Yingjie Cao >Assignee: Yingjie Cao >Priority: Major > Labels: usability > > FLINK-15981 implemented region file based data reader to solve the direct > memory OOM issue introduced by usage of unmanaged direct memory, however only > for BoundedBlockingResultPartition. We can introduce it to sort-merge based > blocking shuffle to avoid the similar direct memory OOM problem which can > improve the usability a lot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15362: [FLINK-21939][python] Support batch mode in Python DataStream API for process operation and reduce operation
flinkbot edited a comment on pull request #15362: URL: https://github.com/apache/flink/pull/15362#issuecomment-805817140 ## CI report: * a46a6236b098fdc7b89755d4b072053134d42507 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15394) * f65f9751b0e4a78d617f3c5a6d483d73580cfe0d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15161: [FLINK-20114][connector/kafka,common] Fix a few KafkaSource-related bugs
flinkbot edited a comment on pull request #15161: URL: https://github.com/apache/flink/pull/15161#issuecomment-797177953 ## CI report: * d3f59e302b9860d49ce79252aeb8bd1decaeeabf UNKNOWN * 3bbb86b12a43044ce3f876401c98f28c87858fbe Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15366) * c58bb5faec3f07e0b66d0e042e3e9c6082fb8e9c UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-20985) end the stream-job in docker container will remain some zombie processes in 1.11, these useless processes not be maked as zombie but still exist in 1.12
[ https://issues.apache.org/jira/browse/FLINK-20985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308307#comment-17308307 ] Dian Fu commented on FLINK-20985: - Are these processes managed by Flink? Could you give more clear description on what these processes are? > end the stream-job in docker container will remain some zombie processes in > 1.11, these useless processes not be maked as zombie but still exist in 1.12 > - > > Key: FLINK-20985 > URL: https://issues.apache.org/jira/browse/FLINK-20985 > Project: Flink > Issue Type: Improvement > Components: flink-docker >Reporter: he jie >Priority: Major > > when i use pyflink in docker , end the stream-job will remain some process. > these process used for endpoint and tmp python logs . In flink-1.11 ,these > process will be marked as zombie; in flink-1.12, these process wont be marked > but still exit 。 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21942) KubernetesLeaderRetrievalDriver not closed after terminated which lead to connection leak
[ https://issues.apache.org/jira/browse/FLINK-21942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308299#comment-17308299 ] Yi Tang commented on FLINK-21942: - [~fly_in_gis] Sure, I've uploaded a ( [^jstack.l] ) corresponding the first scenario. You can focus on those OkHttp And OkHttp Websocket threads first. I don't think add more cluster.io-pool.size will help, so it's still 16 as shown in stack. > KubernetesLeaderRetrievalDriver not closed after terminated which lead to > connection leak > - > > Key: FLINK-21942 > URL: https://issues.apache.org/jira/browse/FLINK-21942 > Project: Flink > Issue Type: Bug >Reporter: Yi Tang >Priority: Major > Attachments: image-2021-03-24-18-08-30-196.png, > image-2021-03-24-18-08-42-116.png, jstack.l > > > Looks like KubernetesLeaderRetrievalDriver is not closed even if the > KubernetesLeaderElectionDriver is closed and job reach globally terminated. > This will lead to many configmap watching be still active with connections to > K8s. > When the connections exceeds max concurrent requests, those new configmap > watching can not be started. Finally leads to all new jobs submitted timeout. > [~fly_in_gis] [~trohrmann] This may be related to FLINK-20695, could you > confirm this issue? > But when many jobs are running in same session cluster, the config map > watching is required to be active. Maybe we should merge all config maps > watching? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21942) KubernetesLeaderRetrievalDriver not closed after terminated which lead to connection leak
[ https://issues.apache.org/jira/browse/FLINK-21942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yi Tang updated FLINK-21942: Attachment: jstack.l > KubernetesLeaderRetrievalDriver not closed after terminated which lead to > connection leak > - > > Key: FLINK-21942 > URL: https://issues.apache.org/jira/browse/FLINK-21942 > Project: Flink > Issue Type: Bug >Reporter: Yi Tang >Priority: Major > Attachments: image-2021-03-24-18-08-30-196.png, > image-2021-03-24-18-08-42-116.png, jstack.l > > > Looks like KubernetesLeaderRetrievalDriver is not closed even if the > KubernetesLeaderElectionDriver is closed and job reach globally terminated. > This will lead to many configmap watching be still active with connections to > K8s. > When the connections exceeds max concurrent requests, those new configmap > watching can not be started. Finally leads to all new jobs submitted timeout. > [~fly_in_gis] [~trohrmann] This may be related to FLINK-20695, could you > confirm this issue? > But when many jobs are running in same session cluster, the config map > watching is required to be active. Maybe we should merge all config maps > watching? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] lirui-apache commented on pull request #15274: [FLINK-21829][Hive Connect]Fix to throw exception when custom hadoop conf path does not exist
lirui-apache commented on pull request #15274: URL: https://github.com/apache/flink/pull/15274#issuecomment-806314134 > > HiveCatalogFactoryTest > > Hi @lirui-apache , is it ok? > ` > > ``` > @Test > public void testCreateHiveCatalogWithIllegalHadoopConfDir() throws IOException { > final String catalogName = "mycatalog"; > > final String illegalHadoopConfDir = " " + tempFolder.newFolder().getAbsolutePath(); > > try { > final HiveCatalog hiveCatalog = > HiveTestUtils.createHiveCatalog( > catalogName, CONF_DIR.getPath(), illegalHadoopConfDir, null); > } catch (Exception e) { > assertEquals(e.getClass(), CatalogException.class); > } > } > ``` > > ` Hi @hehuiyuan , two suggestions about the test: 1. You should create the HiveCatalog with `HiveCatalogFactory` like other test cases in this class. It's because we should verify the behavior of `HiveCatalogFactory`, rather than `HiveTestUtils`. 2. Instead of catching all exceptions and assert it's a `CatalogException`, you can just catch `CatalogException` only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-20374) Wrong result when shuffling changelog stream on non-primary-key columns
[ https://issues.apache.org/jira/browse/FLINK-20374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-20374: Description: This is reported from user-zh ML: http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html {code:sql} CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'test' ) CREATE TABLE status ( `id` INT, `name` VARCHAR(255), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'status' ); -- output CREATE TABLE test_status ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, `status_name` VARCHAR(255) PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'xxx', 'index' = 'xxx', 'username' = 'xxx', 'password' = 'xxx', 'sink.bulk-flush.backoff.max-retries' = '10', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', 'sink.bulk-flush.max-actions' = '5000', 'sink.bulk-flush.max-size' = '10mb', 'sink.bulk-flush.interval' = '1s' ); INSERT into test_status SELECT t.*, s.name FROM test AS t LEFT JOIN status AS s ON t.status = s.id; {code} Data in mysql table: {code} test: 0, name0, 2020-07-06 00:00:00 , 0 1, name1, 2020-07-06 00:00:00 , 1 2, name2, 2020-07-06 00:00:00 , 1 . status 0, status0 1, status1 2, status2 . {code} Operations: 1. start job with paralleslim=40, result in test_status sink is correct: {code} 0, name0, 2020-07-06 00:00:00 , 0, status0 1, name1, 2020-07-06 00:00:00 , 1, status1 2, name2, 2020-07-06 00:00:00 , 1, status1 {code} 2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}. 3. Result is not correct because the {{id=2}} record is missing in the result. The reason is that it shuffles the changelog {{test}} on {{status}} column which is not the primary key. Therefore, the ordering can't be guaranteed, and the result is wrong. The {{-U[2, name2, 2020-07-06 00:00:00 , 1]}} and {{+U[2, name2, 2020-07-06 00:00:00 , 2]}} will possible be shuffled to different join task, so the order of joined results is not guaranteed when they arrive to the sink task. It is possbile {{+U[2, name2, 2020-07-06 00:00:00 , status2]}} arrives first, and then {{-U[2, name2, 2020-07-06 00:00:00 , status1]}} , then the {{id=2}} record is missing in Elasticsearch. It seems that we need a changelog ordering mechanism in the planner. was: This is reported from user-zh ML: http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html {code:sql} CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'test' ) CREATE TABLE status ( `id` INT, `name` VARCHAR(255), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '1', 'database-name' = 'ai_audio_lyric_task', 'table-name' = 'status' ); -- output CREATE TABLE test_status ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, `status_name` VARCHAR(255) PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'xxx', 'index' = 'xxx', 'username' = 'xxx', 'password' = 'xxx', 'sink.bulk-flush.backoff.max-retries' = '10', 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', 'sink.bulk-flush.max-actions' = '5000', 'sink.bulk-flush.max-size' = '10mb', 'sink.bulk-flush.interval' = '1s' ); INSERT into test_status SELECT t.*, s.name FROM test AS t LEFT JOIN status AS s ON t.status = s.id; {code} Data in mysql table: {code} test: 0, name0, 2020-07-06 00:00:00 , 0 1, name1, 2020-07-06 00:00:00 , 1 2, name2, 2020-07-06 00:00:00 , 1 . status 0, status0 1, status1 2, status2 . {code} Operations: 1. start job with paralleslim=40, result in test_status sink is correct: {code} 0, name0, 2020-07-06 00:00:00 , 0, status0 1, name1, 2020-07-06 00:00:00 , 1, status1 2, name2, 2020-07-06 00:00:00 , 1, status1 {code} 2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}. 3. Result is not correct because the {{id=2}} record is missing in the result. The reason is that it shuffles the changelog {{test}} on {{status}} column which is not the primary key. Therefore, the ordering can't be guaranteed, and the result is
[GitHub] [flink] lindong28 commented on pull request #15161: [FLINK-20114][connector/kafka,common] Fix a few KafkaSource-related bugs
lindong28 commented on pull request #15161: URL: https://github.com/apache/flink/pull/15161#issuecomment-806310235 @StephanEwen @becketqin Thanks for the review! I think all issues are resolved. Could you take another look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] lindong28 commented on a change in pull request #15161: [FLINK-20114][connector/kafka,common] Fix a few KafkaSource-related bugs
lindong28 commented on a change in pull request #15161: URL: https://github.com/apache/flink/pull/15161#discussion_r600993395 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java ## @@ -74,7 +74,8 @@ public void start() throws Exception { @Override public void close() throws Exception { closed = true; -coordinator.closeAsync(closingTimeoutMs); +// Wait for coordinator close before destructing any user class loader. +coordinator.closeAsync(closingTimeoutMs).get(); Review comment: @becketqin Thanks for the explanation! It looks like we don't have clear idea on the root cause of `ClassNotFoundException`. And there is potential risk (probably unlikely) with making the call synchronous when user code has blocking operation. Regarding the 2nd point above, it looks like there could be tradeoff between resource leak and blocking JobManager's scheduler thread. I suppose it is better to understand the resource leak before making JobManager's thread blocking. And from https://issues.apache.org/jira/browse/FLINK-20114, it looks like the ClassNotFoundException is emitted in the WARNING without disrupting user's job. It seems OK to still emit this ClassNotFoundException in the log until we resolve the concerns described above. @StephanEwen @becketqin To place on the safe side, I have removed this change from this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-21294) Support state access API for the map/flat_map operation of Python ConnectedStreams
[ https://issues.apache.org/jira/browse/FLINK-21294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308286#comment-17308286 ] Wei Zhong commented on FLINK-21294: --- Merged via 7e49b1af59341d924eee1742ea59e8c43c3be73e > Support state access API for the map/flat_map operation of Python > ConnectedStreams > -- > > Key: FLINK-21294 > URL: https://issues.apache.org/jira/browse/FLINK-21294 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-21294) Support state access API for the map/flat_map operation of Python ConnectedStreams
[ https://issues.apache.org/jira/browse/FLINK-21294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Zhong reassigned FLINK-21294: - Assignee: Wei Zhong > Support state access API for the map/flat_map operation of Python > ConnectedStreams > -- > > Key: FLINK-21294 > URL: https://issues.apache.org/jira/browse/FLINK-21294 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Wei Zhong >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-21294) Support state access API for the map/flat_map operation of Python ConnectedStreams
[ https://issues.apache.org/jira/browse/FLINK-21294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei Zhong closed FLINK-21294. - Resolution: Fixed > Support state access API for the map/flat_map operation of Python > ConnectedStreams > -- > > Key: FLINK-21294 > URL: https://issues.apache.org/jira/browse/FLINK-21294 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Wei Zhong >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] WeiZhong94 closed pull request #15149: [FLINK-21294][python] Support state access API for the map/flat_map operation of Python ConnectedStreams
WeiZhong94 closed pull request #15149: URL: https://github.com/apache/flink/pull/15149 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] leonardBang commented on a change in pull request #15303: [FLINK-21713][table-api/table-planner] Correct function CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW/LOCALTIME/LOCALTIMEST
leonardBang commented on a change in pull request #15303: URL: https://github.com/apache/flink/pull/15303#discussion_r600984396 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala ## @@ -455,16 +455,21 @@ abstract class PlannerBase( * the configuration before planner do optimization with [[ModifyOperation]] or other works. */ protected def validateAndOverrideConfiguration(): Unit = { -if (!config.getConfiguration.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.BLINK)) { +val configuration = config.getConfiguration +if (!configuration.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.BLINK)) { throw new IllegalArgumentException( "Mismatch between configured planner and actual planner. " + "Currently, the 'table.planner' can only be set when instantiating the " + "table environment. Subsequent changes are not supported. " + "Please instantiate a new TableEnvironment if necessary."); } +// Add a query start time to TableConfig, this config is only used internal, +// this config will be used by temporal functions like CURRENT_TIMESTAMP. +configuration.setLong("__table.query-start.epoch-time__", System.currentTimeMillis()) Review comment: Thanks for your feedback, I'll remove this flag -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] dianfu commented on a change in pull request #15362: [FLINK-21939][python] Support batch mode in Python DataStream API for process operation and reduce operation
dianfu commented on a change in pull request #15362: URL: https://github.com/apache/flink/pull/15362#discussion_r600982520 ## File path: flink-python/pyflink/datastream/tests/test_data_stream.py ## @@ -1068,6 +1013,35 @@ def on_timer(self, timestamp, ctx): expected_result.sort() self.assertEqual(expected_result, result) +def test_reduce_function_without_data_types(self): Review comment: I'll rename it to test_reduce_function to avoid confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15339: [FLINK-21448] Randomize ITTests for delegating state backend
flinkbot edited a comment on pull request #15339: URL: https://github.com/apache/flink/pull/15339#issuecomment-804687609 ## CI report: * 78a059002986b8c966a7916dce4da7eeea054a3c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15381) * 1fbc891a62624a340d1b4f6db8a4c4e6aaf0d649 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15414) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-21613) Parse Compute Column with `IN` expression throws NPE
[ https://issues.apache.org/jira/browse/FLINK-21613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308276#comment-17308276 ] jibiyr commented on FLINK-21613: [~twalthr] I seek advice from the calcite community, here is the discussion link [CALCITE-4548|https://issues.apache.org/jira/browse/CALCITE-4548] > Parse Compute Column with `IN` expression throws NPE > > > Key: FLINK-21613 > URL: https://issues.apache.org/jira/browse/FLINK-21613 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.13.0 >Reporter: Shuo Cheng >Priority: Major > > Considering the following given sql: > {code:sql} > CREATE TABLE MyInputFormatTable ( > `a` INT, > `b` BIGINT, > `c` STRING, > `d` as `c` IN ('Hi', 'Hello') > ) WITH ( > 'connector' = 'values', > 'data-id' = '$dataId', > 'runtime-source' = 'InputFormat' > ) > {code} > NPE will be thrown during parsing the sql: > `select * from MyInputFormatTable` > It seems it's the commit "[hotfix][table-planner-blink] Simplify SQL > expression to RexNode conversion" which introduces this problem. This hotfix > uses a method `SqlToRelConverter#convertExpression` and this method does not > has any tests and is not used in Calcite anywhere, which is unsafe. Maybe > reverting the hotfix is a good choice. > CC [~twalthr] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on pull request #15339: [FLINK-21448] Randomize ITTests for delegating state backend
flinkbot edited a comment on pull request #15339: URL: https://github.com/apache/flink/pull/15339#issuecomment-804687609 ## CI report: * 78a059002986b8c966a7916dce4da7eeea054a3c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15381) * 1fbc891a62624a340d1b4f6db8a4c4e6aaf0d649 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] SteNicholas commented on a change in pull request #15362: [FLINK-21939][python] Support batch mode in Python DataStream API for process operation and reduce operation
SteNicholas commented on a change in pull request #15362: URL: https://github.com/apache/flink/pull/15362#discussion_r600967800 ## File path: flink-python/pyflink/datastream/tests/test_data_stream.py ## @@ -1068,6 +1013,35 @@ def on_timer(self, timestamp, ctx): expected_result.sort() self.assertEqual(expected_result, result) +def test_reduce_function_without_data_types(self): Review comment: Could this add `test_reduce_function_with_data_types` test case? ## File path: flink-python/pyflink/datastream/tests/test_data_stream.py ## @@ -1118,6 +1092,35 @@ def on_timer(self, timestamp, ctx): expected_result.sort() self.assertEqual(expected_result, result) +def test_reduce_function_without_data_types(self): Review comment: Could this add `test_reduce_function_with_data_types` test case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15252: [FLINK-20654][tests] Forcing network action logger on TRACE for Unaligned Checkpoint ITCases.
flinkbot edited a comment on pull request #15252: URL: https://github.com/apache/flink/pull/15252#issuecomment-801056952 ## CI report: * d428f852c942869f699c84b7a942686d88fac2ea Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15409) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] maosuhan commented on a change in pull request #14376: [FLINK-18202][PB format] New Format of protobuf
maosuhan commented on a change in pull request #14376: URL: https://github.com/apache/flink/pull/14376#discussion_r600964583 ## File path: flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Descriptors.FieldDescriptor; +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class PbSchemaValidator { + private Descriptors.Descriptor descriptor; + private RowType rowType; + private Map> typeMatchMap = new HashMap(); + + public PbSchemaValidator(Descriptors.Descriptor descriptor, RowType rowType) { + this.descriptor = descriptor; + this.rowType = rowType; + typeMatchMap.put(JavaType.BOOLEAN, Collections.singletonList(LogicalTypeRoot.BOOLEAN)); + typeMatchMap.put( + JavaType.BYTE_STRING, + Arrays.asList(LogicalTypeRoot.BINARY, LogicalTypeRoot.VARBINARY)); + typeMatchMap.put(JavaType.DOUBLE, Collections.singletonList(LogicalTypeRoot.DOUBLE)); + typeMatchMap.put(JavaType.FLOAT, Collections.singletonList(LogicalTypeRoot.FLOAT)); + typeMatchMap.put( + JavaType.ENUM, + Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.CHAR)); + typeMatchMap.put( + JavaType.STRING, + Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.CHAR)); + typeMatchMap.put(JavaType.INT, Collections.singletonList(LogicalTypeRoot.INTEGER)); + typeMatchMap.put(JavaType.LONG, Collections.singletonList(LogicalTypeRoot.BIGINT)); + } + + public Descriptors.Descriptor getDescriptor() { + return descriptor; + } + + public void setDescriptor(Descriptors.Descriptor descriptor) { + this.descriptor = descriptor; + } + + public RowType getRowType() { + return rowType; + } + + public void setRowType(RowType rowType) { + this.rowType = rowType; + } + + public void validate() { + validateTypeMatch(descriptor, rowType); + if (!descriptor.getFile().getOptions().getJavaPackage() + .equals(descriptor.getFile().getPackage())) { + throw new IllegalArgumentException( + "java_package and package must be the same in proto definition"); + } + if (!descriptor.getFile().getOptions().getJavaMultipleFiles()) { Review comment: Now the PR can support different combinations of java_package, package, java_outer_class_name, java_multiple_files now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] maosuhan commented on pull request #14376: [FLINK-18202][PB format] New Format of protobuf
maosuhan commented on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-806273548 @wuchong Thanks for your opinion. 1. Regarding your suggestion, should we use `connector..default_value=""`? The field type must be array or map because only these 2 types do not tolerate null values in protobuf. The `` only support simple type like int/string/float/enum etc. And if user do not set this param, we will use protobuf's default value. 2. I have a look at the `org.apache.flink.table.data.conversion.StructuredObjectConverter#generateCode` and the implementation is similar to my code. And I can use `org.apache.flink.table.runtime.generated.CompileUtils` to finish code compiling and classloading work. Does it sound good? @wuchong @libenchao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15262: [FLINK-21745][tests] Sets the parallelism in tests to comply to the AdaptiveScheduler requirements
flinkbot edited a comment on pull request #15262: URL: https://github.com/apache/flink/pull/15262#issuecomment-801762943 ## CI report: * c0aa1e5299dce8c6aaf38266cbfe4729c94d18f7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15407) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #15348: [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode
zentol commented on pull request #15348: URL: https://github.com/apache/flink/pull/15348#issuecomment-806229037 > it does not seem possible to be able to detect that scenario from the information available [...] Correct. > [throw an exception] if the StateAssignmentOperation attempts to lower [the maxParallelism] in reactive mode Yes, that should be sufficient. > I think being able to detect these scenarios would require adding how the max parallelism was set to the checkpoint data, which does not seem to be worth the change. Does that sound reasonable to you? Handling that is out of scope of the PR. The solution would be to read the savepoint before the ExecutionGraph is created, extract the max parallelism for all vertices, set the maxParallelism in some form, and later assign the states to the actual vertices when the EG is created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15347: [FLINK-21933][kinesis] EFO consumer treats interrupts as retryable ex…
flinkbot edited a comment on pull request #15347: URL: https://github.com/apache/flink/pull/15347#issuecomment-805098241 ## CI report: * 86cef1cc8c4b61d5896b348acd566f2b28c7259a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15404) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15303) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15313: [FLINK-19682] Actively timeout checkpoint barriers on the inputs
flinkbot edited a comment on pull request #15313: URL: https://github.com/apache/flink/pull/15313#issuecomment-803870812 ## CI report: * 1397d8c753aaaede7d94d5a2cca5ec236659a50f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15401) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15241: [FLINK-5717][datastream] Fix NPE and lost timer during window merging for ContinuousProcessingTimeTrigger
flinkbot edited a comment on pull request #15241: URL: https://github.com/apache/flink/pull/15241#issuecomment-800363939 ## CI report: * df089cde666b843c4d9d99c0d10fb87414f9b61a UNKNOWN * bb40fc98aebe13ba666e2f99efa97bc60df792df Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15405) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15348: [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode
flinkbot edited a comment on pull request #15348: URL: https://github.com/apache/flink/pull/15348#issuecomment-805219989 ## CI report: * 58162eda9036884c11ab0329d9f367e8b3780586 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15321) * 548ddea0488e0df73a8fd37234404fb25001c598 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15413) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15355: [FLINK-21076][docs] Add section about Adaptive Scheduler
flinkbot edited a comment on pull request #15355: URL: https://github.com/apache/flink/pull/15355#issuecomment-805578029 ## CI report: * 765978b6268a9f3dfd4af55a7c80d595806458ab Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15408) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15252: [FLINK-20654][tests] Forcing network action logger on TRACE for Unaligned Checkpoint ITCases.
flinkbot edited a comment on pull request #15252: URL: https://github.com/apache/flink/pull/15252#issuecomment-801056952 ## CI report: * 670063c0cb5eadea73f7928f526b0a633b39cd23 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15403) * d428f852c942869f699c84b7a942686d88fac2ea Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15409) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15221: [FLINK-17012][streaming] 'RUNNING' state split into 'RUNNING' and 'RE…
flinkbot edited a comment on pull request #15221: URL: https://github.com/apache/flink/pull/15221#issuecomment-799620682 ## CI report: * 84f94361cb2589fe0e4b5a128e60efd983ab330f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15400) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15348: [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode
flinkbot edited a comment on pull request #15348: URL: https://github.com/apache/flink/pull/15348#issuecomment-805219989 ## CI report: * 58162eda9036884c11ab0329d9f367e8b3780586 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15321) * 548ddea0488e0df73a8fd37234404fb25001c598 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15252: [FLINK-20654][tests] Forcing network action logger on TRACE for Unaligned Checkpoint ITCases.
flinkbot edited a comment on pull request #15252: URL: https://github.com/apache/flink/pull/15252#issuecomment-801056952 ## CI report: * 45de0f2c34f9df12d10c520c8ff42883136d3222 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15399) * 670063c0cb5eadea73f7928f526b0a633b39cd23 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15403) * d428f852c942869f699c84b7a942686d88fac2ea Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15409) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15121: The method $(String) is undefined for the type TableExample
flinkbot edited a comment on pull request #15121: URL: https://github.com/apache/flink/pull/15121#issuecomment-793480240 ## CI report: * dbdacac0f2330873d609a5750ea027673201aaa1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15398) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-statefun] sjwiesman commented on a change in pull request #218: [FLINK-21955]-[FLINK-21960] Pre-release sweep.
sjwiesman commented on a change in pull request #218: URL: https://github.com/apache/flink-statefun/pull/218#discussion_r600616433 ## File path: statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/RoutableProtobufKafkaIngressDeserializer.java ## @@ -58,4 +60,17 @@ public Message deserialize(ConsumerRecord input) { .setPayloadBytes(MoreByteStrings.wrap(payload)) .build(); } + + private byte[] requireNonNullKey(byte[] key) { Review comment: Do we also need this check for Kinesis? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15252: [FLINK-20654][tests] Forcing network action logger on TRACE for Unaligned Checkpoint ITCases.
flinkbot edited a comment on pull request #15252: URL: https://github.com/apache/flink/pull/15252#issuecomment-801056952 ## CI report: * 45de0f2c34f9df12d10c520c8ff42883136d3222 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15399) * 670063c0cb5eadea73f7928f526b0a633b39cd23 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15403) * d428f852c942869f699c84b7a942686d88fac2ea UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] austince edited a comment on pull request #15348: [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode
austince edited a comment on pull request #15348: URL: https://github.com/apache/flink/pull/15348#issuecomment-806171998 it does not seem possible to be able to detect that scenario from the information available in a CompletedCheckpoint, nor is it possible to detect the user-error scenarios that I noted with the given information, so for all cases I would lean towards: * logging this * not updating the max parallelism if the StateAssignmentOperation attempts to lower it in reactive mode I think being able to detect these scenarios would require adding how the max parallelism was set to the checkpoint data, which does not seem to be worth the change. Does that sound reasonable to you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] austince edited a comment on pull request #15348: [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode
austince edited a comment on pull request #15348: URL: https://github.com/apache/flink/pull/15348#issuecomment-806171998 it does not seem possible to be able to detect that scenario from the information available in a CompletedCheckpoint, nor is it possible to detect the user-error scenarios that I noted with the given information, so for all cases I would lean towards: * logging this * not updating the max parallelism if the StateAssignmentOperation attempts to lower it in reactive mode Does that sound reasonable to you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] austince edited a comment on pull request #15348: [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode
austince edited a comment on pull request #15348: URL: https://github.com/apache/flink/pull/15348#issuecomment-806171998 it does not seem possible to be able to detect that scenario from the information available in a CompletedCheckpoint, nor is it possible to detect the user-error scenarios that I noted with the given information, so I would lean towards logging this error for all cases and not updating the max parallelism if the StateAssignmentOperation attempts to lower it in reactive mode. Does that sound reasonable to you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] austince commented on pull request #15348: [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode
austince commented on pull request #15348: URL: https://github.com/apache/flink/pull/15348#issuecomment-806171998 it does not seem possible to be able to detect that scenario from the information available in a CompletedCheckpoint, nor is it possible to detect the user-error scenarios that I noted with the given information, so I would lean towards logging this error and not updating the max parallelism if the StateAssignmentOperation attempts to lower it in reactive mode. Does that sound reasonable to you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zentol commented on pull request #15348: [FLINK-21844][runtime] Do not auto-configure maxParallelism in REACTIVE scheduling mode
zentol commented on pull request #15348: URL: https://github.com/apache/flink/pull/15348#issuecomment-806166285 Here's a scenario where no user-error is involved: - user has savepoint with an automatically derived max parallelism of N - user starts a job from that SP with reactive mode enabled and no explicit max parallelism - cluster has more than N slots available at the start, scheduler sets parallelism accordingly - IllegalStateException occurs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15355: [FLINK-21076][docs] Add section about Adaptive Scheduler
flinkbot edited a comment on pull request #15355: URL: https://github.com/apache/flink/pull/15355#issuecomment-805578029 ## CI report: * 0abc610ce3d013b86e67d48263f1811874bb11c6 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15406) * 765978b6268a9f3dfd4af55a7c80d595806458ab Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15408) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] flinkbot edited a comment on pull request #15252: [FLINK-20654][tests] Forcing network action logger on TRACE for Unaligned Checkpoint ITCases.
flinkbot edited a comment on pull request #15252: URL: https://github.com/apache/flink/pull/15252#issuecomment-801056952 ## CI report: * 45de0f2c34f9df12d10c520c8ff42883136d3222 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15399) * 670063c0cb5eadea73f7928f526b0a633b39cd23 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=15403) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org