[jira] [Created] (FLINK-24287) Bump virtualenv to the latest version
Dian Fu created FLINK-24287: --- Summary: Bump virtualenv to the latest version Key: FLINK-24287 URL: https://issues.apache.org/jira/browse/FLINK-24287 Project: Flink Issue Type: Improvement Components: API / Python, Tests Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.15.0 Currently, the virtualenv version (16.0.0) used in PyFlink tests is a little outdated(the latest version is ). The pip bundled in the old virtualenv is a little old. The consequence is that it will compile the grpcio library from source instead of using the wheel package during installing grpcio. It takes several minutes to compile grpcio and the time could be avoided after bump virtualenv. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24286) Flink TableEnvironment executesql on IntelliJ leads to Could not find a file system implementation for scheme 's3a'
James Kim created FLINK-24286: - Summary: Flink TableEnvironment executesql on IntelliJ leads to Could not find a file system implementation for scheme 's3a' Key: FLINK-24286 URL: https://issues.apache.org/jira/browse/FLINK-24286 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Client Affects Versions: 1.13.2 Environment: Ubuntu 18.04 Reporter: James Kim I'm trying to use the Table API in a simple Java class to create tables, run queries, retrieve the results and use that data for computation. The data is a CSV file from s3a (S3 compatible storage). When I open a terminal tab, start the cluster (standalone) in the flink directory, and on another tab for Flink SQL client embedded and run queries it works fine. I have the proper confs in conf/flink-conf.yaml. However, now i'm tyring to do this programmatically from code so I created a separate project directory on IntelliJ but when I run the program, I get the following error: "Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems." I've seen and fixed this error when running on the terminal but I run the Main class directly from IntelliJ, I get the above error. Is there a way to configure the Main class to read from the flink-conf.yaml file which is in a different path? Main.java: {code:java} import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; public class Main { public static void main(String[] args) { // create a TableEnvironment for batch or streaming execution EnvironmentSettings settings = EnvironmentSettings .newInstance() .inBatchMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // create an input Table TableResult tempResult = tableEnv.executeSql( //"create temporary table ATHLETES (\n" + "create table ATHLETES (\n" + "name varchar,\n" + "country varchar,\n" + "sport varchar\n" + ") with (\n" + "'connector' = 'filesystem',\n" + "'path'='s3a://testbucket/james_experiment/2020_Tokyo_Olympics/Athletes.csv',\n" + "'format'='csv'\n" + ")\n"); TableResult table2 = tableEnv.executeSql("select * from ATHLETES"); }{code} pom.xml: {code:java} http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 groupId flink-ecs-sample 1.0-SNAPSHOT 8 8 org.apache.flink flink-table-api-java-bridge_2.11 1.13.2 compile org.apache.flink flink-table-planner-blink_2.11 1.13.2 compile org.apache.flink flink-streaming-scala_2.11 1.13.2 compile org.apache.flink flink-table-common 1.13.2 compile org.apache.flink flink-csv 1.13.2 org.apache.flink flink-clients_2.11 1.13.2 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24285) Flink Table API Could not find any format factory for identifier 'csv' in the classpath.
James Kim created FLINK-24285: - Summary: Flink Table API Could not find any format factory for identifier 'csv' in the classpath. Key: FLINK-24285 URL: https://issues.apache.org/jira/browse/FLINK-24285 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Client Affects Versions: 1.13.2 Environment: Ubuntu 18.04 Reporter: James Kim I'm trying to read a csv file from s3 compatible storage through s3a protocol through Java code. This is the main class that I have: {code:java} import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; public class Main { public static void main(String[] args) { // create a TableEnvironment for batch or streaming execution EnvironmentSettings settings = EnvironmentSettings .newInstance() .inBatchMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // create an input Table TableResult tempResult = tableEnv.executeSql( //"create temporary table ATHLETES (\n" + "create table ATHLETES (\n" + "name varchar,\n" + "country varchar,\n" + "sport varchar\n" + ") with (\n" + "'connector' = 'filesystem',\n" + "'path'='s3a://testbucket/expFolder/2020_Tokyo_Olympics/Athletes.csv',\n" + "'format'='csv'\n" + ")\n"); TableResult table2 = tableEnv.executeSql("select * from ATHLETES");lder {code} However, when I run this code, I get an exception at the executeSql call. The error log is the following: {code:java} SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".LF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.ATHLETES'. Table options are: 'connector'='filesystem''format'='csv''path'='s3a://testbucket/james_experiment/2020_Tokyo_Olympics/Athletes.csv' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at Main.main(Main.java:31)Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'csv' in the classpath. at org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97) at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134) ... 19 more {code} The exception
Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format
Hi Sandeep, Jan has already provided pretty good guidelines for getting more context on the issue ;) Because this is not for the first time, I would like to raise awareness, that it's not OK to send a user related question to four Apache mailing list (that I know of). Namely: - u...@flink.apache.org - dev@flink.apache.org - u...@beam.apache.org - d...@beam.apache.org Community focus is a very precious resource, that should be used wisely. All of these mailings lists are answering many complex questions each day and it's very unfortunate if any of this work needs to be duplicated. Next time please focus Beam related user questions solely to u...@beam.apache.org . Thanks for your understanding. You can consult community guidelines [1][2] if you are not sure where the particular question belongs to. [1] https://flink.apache.org/community.html#mailing-lists [2] https://beam.apache.org/community/contact-us/ Best, D. On Tue, Sep 14, 2021 at 5:47 PM Jan Lukavský wrote: > Hi Sandeep, > a few questions: > a) which state backend do you use for Flink? > b) what is your checkpointingInterval set for FlinkRunner? > c) how much data is there in your input Kafka topic(s)? > > FileIO has to buffer all elements per window (by default) into state, so > this might create a high pressure on state backend and/or heap, which could > result in suboptimal performance. Due to the "connection loss" and timeout > exceptions you describe I'd suppose there might be a lot of GC pressure. > > Jan > On 9/14/21 5:20 PM, Kathula, Sandeep wrote: > > Hi, > >We have a simple Beam application which reads from Kafka, converts to > parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We > have a fixed window of 5 minutes after conversion to > PCollection and then writing to S3. We have around 320 > columns in our data. Our intention is to write large files of size 128MB or > more so that it won’t have a small file problem when reading back from > Hive. But from what we observed it is taking too much memory to write to S3 > (giving memory of 8GB to heap is not enough to write 50 MB files and it is > going OOM). When I increase memory for heap to 32GB then it take lot of > time to write records to s3. > > For instance it takes: > > > > 20 MB file - 30 sec > > 50 MB file - 1 min 16 sec > > 75 MB file - 2 min 15 sec > > 83 MB file - 2 min 40 sec > > > > Code block to write to S3: > > PCollection parquetRecord = ……. > > > > parquetRecord.apply(FileIO.*write*() > .via(ParquetIO.*sink*(getOutput_schema())) > .to(outputPath.isEmpty() ? outputPath() : outputPath) > .withNumShards(5) > .withNaming(new CustomFileNaming("snappy.parquet"))); > > > > > > We are also getting different exceptions like: > > > >1. *UserCodeException*: > > > > Caused by: org.apache.beam.sdk.util.UserCodeException: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > > at > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) > > at > com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown > Source) > > at > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) > > at > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) > > at > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) > > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) > > at > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) > > at > org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) > > at > org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) > > at >
Beam with Flink runner - Issues when writing to S3 in Parquet Format
Hi, We have a simple Beam application which reads from Kafka, converts to parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We have a fixed window of 5 minutes after conversion to PCollection and then writing to S3. We have around 320 columns in our data. Our intention is to write large files of size 128MB or more so that it won’t have a small file problem when reading back from Hive. But from what we observed it is taking too much memory to write to S3 (giving memory of 8GB to heap is not enough to write 50 MB files and it is going OOM). When I increase memory for heap to 32GB then it take lot of time to write records to s3. For instance it takes: 20 MB file - 30 sec 50 MB file - 1 min 16 sec 75 MB file - 2 min 15 sec 83 MB file - 2 min 40 sec Code block to write to S3: PCollection parquetRecord = ……. parquetRecord.apply(FileIO.write() .via(ParquetIO.sink(getOutput_schema())) .to(outputPath.isEmpty() ? outputPath() : outputPath) .withNumShards(5) .withNaming(new CustomFileNaming("snappy.parquet"))); We are also getting different exceptions like: 1. UserCodeException: Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) at com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36) at java.lang.Iterable.forEach(Iterable.java:75) at com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34) at com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at
[jira] [Created] (FLINK-24284) Add a greeter and a showcase for the JavaScript SDK
Igal Shilman created FLINK-24284: Summary: Add a greeter and a showcase for the JavaScript SDK Key: FLINK-24284 URL: https://issues.apache.org/jira/browse/FLINK-24284 Project: Flink Issue Type: Improvement Components: Stateful Functions Reporter: Igal Shilman Assignee: Igal Shilman We need to add a greeter and a showcase for the Javascript SDK to the playground. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24283) Pulsar connector won't use given hash ranges in Key_Shared mode
Yufan Sheng created FLINK-24283: --- Summary: Pulsar connector won't use given hash ranges in Key_Shared mode Key: FLINK-24283 URL: https://issues.apache.org/jira/browse/FLINK-24283 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.14.0 Reporter: Yufan Sheng Fix For: 1.14.0 Pulsar broker will keep the old consumer select if the consumer has been closed. This would lead to the sticky key range won't take effect. We should use a sticky hash range when seeking the initial position in the source enumerator. https://github.com/apache/pulsar/pull/12035 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24282) KafkaRecordSerializationSchema TopicSelector is not serializable
Fabian Paul created FLINK-24282: --- Summary: KafkaRecordSerializationSchema TopicSelector is not serializable Key: FLINK-24282 URL: https://issues.apache.org/jira/browse/FLINK-24282 Project: Flink Issue Type: Bug Affects Versions: 1.14.0 Reporter: Fabian Paul To dynamically calculate the outgoing topic we allow passing a lambda. Unfortunately, it is currently not marked as serializable hence the following code fails in during closure cleaning when used within a job. {code:java} KafkaRecordSerializationSchema.builder() .setTopic(topic) .setValueSerializationSchema(serSchema) .setPartitioner(partitioner) .build()) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: Contribute to Flink StateFun
Hi Evans, Welcome aboard :) I'll definitely keep that in mind! Meanwhile I would also like to point out that Statefun is built using Apache Flink, and any improvement in Flink, directly contributes to StateFun, so I would definitely encourage you to consider contributing to Flink as-well, I'm positive that the greater Flink dev community will appreciate that. Cheers, Igal. On Tue, Sep 14, 2021 at 12:59 PM Evans Ye wrote: > Hi team, > > I'm Evans Ye and I'd like to contribute more to the Flink StateFun project. > Previously Gordon gave me a hand on mentoring me thus I had a lot of fun > contributing to the multi-lang E2E tests framework. > I know that the project PMC got a direction of the project, so it would be > great if the team can share some JIRAs that's on the priority list and are > open up for contributions. > Thanks. > > - Evans >
[jira] [Created] (FLINK-24281) Migrate all existing tests to new Kafka Sink
Fabian Paul created FLINK-24281: --- Summary: Migrate all existing tests to new Kafka Sink Key: FLINK-24281 URL: https://issues.apache.org/jira/browse/FLINK-24281 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.14.0, 1.15.0 Reporter: Fabian Paul The FlinkKafkaProducer is deprecated since 1.14 but a lot of existing tests are still using. We should replace it with the KafkaSink because it completely subsumes it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24280) Support manual checkpoints triggering from a MiniCluster
Dawid Wysakowicz created FLINK-24280: Summary: Support manual checkpoints triggering from a MiniCluster Key: FLINK-24280 URL: https://issues.apache.org/jira/browse/FLINK-24280 Project: Flink Issue Type: New Feature Components: Runtime / Checkpointing, Runtime / Coordination Reporter: Dawid Wysakowicz Fix For: 1.15.0 The goal is to be able to trigger checkpoints manually at a desired time. The intention is to use it in tests. We do not want to make this a user-facing feature. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Contribute to Flink StateFun
Hi team, I'm Evans Ye and I'd like to contribute more to the Flink StateFun project. Previously Gordon gave me a hand on mentoring me thus I had a lot of fun contributing to the multi-lang E2E tests framework. I know that the project PMC got a direction of the project, so it would be great if the team can share some JIRAs that's on the priority list and are open up for contributions. Thanks. - Evans
[jira] [Created] (FLINK-24279) Support withBroadcast in DataStream API
ZHANG ZHIPENG created FLINK-24279: - Summary: Support withBroadcast in DataStream API Key: FLINK-24279 URL: https://issues.apache.org/jira/browse/FLINK-24279 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Reporter: ZHANG ZHIPENG When doing machine learning using DataStream API, we found that DataStream lacks withBroadcast() function, which could be useful in machine learning. A DataSet-based demo is like: DataSet d1 = ...; DataSet d2 = ...; d1.map(new RichMapFunction () { @Override public Object map(Object aLong) throws Exception { List elements = getRuntimeContext().getBroadcastVariable("d1"); ... } }).withBroadcastSet(d2, "d2") -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24278) [FLIP-171] Async Sink Base Sink Developer Guide for Documentation
Zichen Liu created FLINK-24278: -- Summary: [FLIP-171] Async Sink Base Sink Developer Guide for Documentation Key: FLINK-24278 URL: https://issues.apache.org/jira/browse/FLINK-24278 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 *User stories* * As a Sink user, I’d like to configure the batch size for items to send to the destination at once (e.g. “flush if there are x number of items in the batch”) * As a Sink user, I’d like to configure the batching logic so that I can flush the batch of requests based on time period (e.g. “flush every 2 seconds”) * As a Sink user I’d like to specify the number of bytes for the batch of requests to be flushed (e.g. ”submit the batch after the total number of bytes in it is above 1KB”) * As a Sink developer, I’d like to use the configuration mechanism provided to allow Sink users to configure my Sink implementation * *Scope* * Allow Sink developers and users to pass batch size config to the AsyncSinkWriter * Add support for time-based flushing (e.g. “flush after x miliseconds”) using the ProcessingTimeService which is part of the Sink interface * Add support for byte-based flushing * Consider the combination of time-based flushing and byte-based flushing, if there are more bytes than configured in the time-based batch, then the last few (however many necessary) items should go in the next batch to satisfy the requirement for the number of bytes. *References* More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
退订
[jira] [Created] (FLINK-24277) Offset commit should be disabled if consumer group ID is not specified in KafkaSource
Qingsheng Ren created FLINK-24277: - Summary: Offset commit should be disabled if consumer group ID is not specified in KafkaSource Key: FLINK-24277 URL: https://issues.apache.org/jira/browse/FLINK-24277 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.14.0 Reporter: Qingsheng Ren Fix For: 1.14.0 FLINK-24051 made "group.id" an optional configuration in KafkaSource. However, KafkaSource will generate a random group id if user doesn't specify one, and this random ID is inconsistent after failover, and not even human readable. A solution will be adding a configuration for offset commit on checkpoint, make it as true by default, and disable offset commit if group id is not specified. -- This message was sent by Atlassian Jira (v8.3.4#803005)