[jira] [Created] (FLINK-24287) Bump virtualenv to the latest version

2021-09-14 Thread Dian Fu (Jira)
Dian Fu created FLINK-24287:
---

 Summary: Bump virtualenv to the latest version
 Key: FLINK-24287
 URL: https://issues.apache.org/jira/browse/FLINK-24287
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Tests
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.15.0


Currently, the virtualenv version (16.0.0) used in PyFlink tests is a little 
outdated(the latest version is ). The pip bundled in the old virtualenv is a 
little old. The consequence is that it will compile the grpcio library from 
source instead of using the wheel package during installing grpcio. It takes 
several minutes to compile grpcio and the time could be avoided after bump 
virtualenv.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24286) Flink TableEnvironment executesql on IntelliJ leads to Could not find a file system implementation for scheme 's3a'

2021-09-14 Thread James Kim (Jira)
James Kim created FLINK-24286:
-

 Summary: Flink TableEnvironment executesql on IntelliJ leads to 
Could not find a file system implementation for scheme 's3a'
 Key: FLINK-24286
 URL: https://issues.apache.org/jira/browse/FLINK-24286
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Client
Affects Versions: 1.13.2
 Environment: Ubuntu 18.04
Reporter: James Kim


I'm trying to use the Table API in a simple Java class to create tables, run 
queries, retrieve the results and use that data for computation. The data is a 
CSV file from s3a (S3 compatible storage).

When I open a terminal tab, start the cluster (standalone) in the flink 
directory, and on another tab for Flink SQL client embedded and run queries it 
works fine. I have the proper confs in conf/flink-conf.yaml. 

 

However, now i'm tyring to do this programmatically from code so I created a 
separate project directory on IntelliJ but when I run the program, I get the 
following error:

"Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: 
Creating the input splits caused an error: Could not find a file system 
implementation for scheme 's3a'. The scheme is directly supported by Flink 
through the following plugin: flink-s3-fs-hadoop. Please ensure that each 
plugin resides within its own subfolder within the plugins directory. See 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for 
more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems."

 

I've seen and fixed this error when running on the terminal but I run the Main 
class directly from IntelliJ, I get the above error.

Is there a way to configure the Main class to read from the flink-conf.yaml 
file which is in a different path?

 

Main.java:
{code:java}
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

public class Main {
public static void main(String[] args) {
// create a TableEnvironment for batch or streaming execution
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

// create an input Table
TableResult tempResult = tableEnv.executeSql(
//"create temporary table ATHLETES (\n" +
"create table ATHLETES (\n" +
"name varchar,\n" +
"country varchar,\n" +
"sport varchar\n" +
") with (\n" +
"'connector' = 'filesystem',\n" +

"'path'='s3a://testbucket/james_experiment/2020_Tokyo_Olympics/Athletes.csv',\n"
 +
"'format'='csv'\n" +
")\n");

TableResult table2 = tableEnv.executeSql("select * from ATHLETES");
}{code}
 

pom.xml:

 
{code:java}

http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
4.0.0

groupId
flink-ecs-sample
1.0-SNAPSHOT


8
8




org.apache.flink
flink-table-api-java-bridge_2.11
1.13.2
compile




org.apache.flink
flink-table-planner-blink_2.11
1.13.2
compile



org.apache.flink
flink-streaming-scala_2.11
1.13.2
compile



org.apache.flink
flink-table-common
1.13.2
compile




org.apache.flink
flink-csv
1.13.2



org.apache.flink
flink-clients_2.11
1.13.2





{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24285) Flink Table API Could not find any format factory for identifier 'csv' in the classpath.

2021-09-14 Thread James Kim (Jira)
James Kim created FLINK-24285:
-

 Summary: Flink Table API Could not find any format factory for 
identifier 'csv' in the classpath.
 Key: FLINK-24285
 URL: https://issues.apache.org/jira/browse/FLINK-24285
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Client
Affects Versions: 1.13.2
 Environment: Ubuntu 18.04
Reporter: James Kim


I'm trying to read a csv file from s3 compatible storage through s3a protocol 
through Java code.

 

This is the main class that I have:
{code:java}
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

public class Main {
public static void main(String[] args) {
// create a TableEnvironment for batch or streaming execution
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();

TableEnvironment tableEnv = TableEnvironment.create(settings);

// create an input Table
TableResult tempResult = tableEnv.executeSql(
//"create temporary table ATHLETES (\n" +
"create table ATHLETES (\n" +
"name varchar,\n" +
"country varchar,\n" +
"sport varchar\n" +
") with (\n" +
"'connector' = 'filesystem',\n" +

"'path'='s3a://testbucket/expFolder/2020_Tokyo_Olympics/Athletes.csv',\n" +
"'format'='csv'\n" +
")\n");

TableResult table2 = tableEnv.executeSql("select * from ATHLETES");lder
{code}
However, when I run this code, I get an exception at the executeSql call. The 
error log is the following: 
{code:java}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".LF4J: Failed to 
load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to 
no-operation (NOP) logger implementationSLF4J: See 
http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.Exception in thread "main" 
org.apache.flink.table.api.ValidationException: Unable to create a source for 
reading table 'default_catalog.default_database.ATHLETES'.
Table options are:
'connector'='filesystem''format'='csv''path'='s3a://testbucket/james_experiment/2020_Tokyo_Olympics/Athletes.csv'
 at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
 at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
 at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) 
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
 at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
 at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
 at Main.main(Main.java:31)Caused by: 
org.apache.flink.table.api.ValidationException: Could not find any format 
factory for identifier 'csv' in the classpath. at 
org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97)
 at 
org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74)
 at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
 ... 19 more
{code}
The exception 

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread David Morávek
Hi Sandeep,

Jan has already provided pretty good guidelines for getting more context on
the issue ;)

Because this is not for the first time, I would like to raise awareness,
that it's not OK to send a user related question to four Apache mailing
list (that I know of). Namely:

- u...@flink.apache.org
- dev@flink.apache.org
- u...@beam.apache.org
- d...@beam.apache.org

Community focus is a very precious resource, that should be used wisely.
All of these mailings lists are answering many complex questions each day
and it's very unfortunate if any of this work needs to be duplicated. Next
time please focus Beam related user questions solely to u...@beam.apache.org
.

Thanks for your understanding. You can consult community guidelines [1][2]
if you are not sure where the particular question belongs to.

[1] https://flink.apache.org/community.html#mailing-lists
[2] https://beam.apache.org/community/contact-us/

Best,
D.

On Tue, Sep 14, 2021 at 5:47 PM Jan Lukavský  wrote:

> Hi Sandeep,
> a few questions:
>  a) which state backend do you use for Flink?
>  b) what is your checkpointingInterval set for FlinkRunner?
>  c) how much data is there in your input Kafka topic(s)?
>
> FileIO has to buffer all elements per window (by default) into state, so
> this might create a high pressure on state backend and/or heap, which could
> result in suboptimal performance. Due to the "connection loss" and timeout
> exceptions you describe I'd suppose there might be a lot of GC pressure.
>
>  Jan
> On 9/14/21 5:20 PM, Kathula, Sandeep wrote:
>
> Hi,
>
>We have a simple Beam application which reads from Kafka, converts to
> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
> have a fixed window of 5 minutes after conversion to
> PCollection and then writing to S3. We have around 320
> columns in our data. Our intention is to write large files of size 128MB or
> more so that it won’t have a small file problem when reading back from
> Hive. But from what we observed it is taking too much memory to write to S3
> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
> going OOM). When I increase memory for heap to 32GB then it take lot of
> time to write records to s3.
>
> For instance it takes:
>
>
>
> 20 MB file - 30 sec
>
> 50 MB file - 1 min 16 sec
>
> 75 MB file - 2 min 15 sec
>
> 83 MB file - 2 min 40 sec
>
>
>
> Code block to write to S3:
>
> PCollection parquetRecord = …….
>
>
>
> parquetRecord.apply(FileIO.*write*()
> .via(ParquetIO.*sink*(getOutput_schema()))
> .to(outputPath.isEmpty() ? outputPath() : outputPath)
> .withNumShards(5)
> .withNaming(new CustomFileNaming("snappy.parquet")));
>
>
>
>
>
> We are also getting different exceptions like:
>
>
>
>1. *UserCodeException*:
>
>
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>
> at
> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>
> at
> 

Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread Kathula, Sandeep
Hi,
   We have a simple Beam application which reads from Kafka, converts to 
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We have a 
fixed window of 5 minutes after conversion to PCollection and 
then writing to S3. We have around 320 columns in our data. Our intention is to 
write large files of size 128MB or more so that it won’t have a small file 
problem when reading back from Hive. But from what we observed it is taking too 
much memory to write to S3 (giving memory of 8GB to heap is not enough to write 
50 MB files and it is going OOM). When I increase memory for heap to 32GB then 
it take lot of time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection parquetRecord = …….

parquetRecord.apply(FileIO.write()
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath.isEmpty() ? outputPath() : outputPath)
.withNumShards(5)
.withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


  1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at 
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
at java.lang.Iterable.forEach(Iterable.java:75)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 

[jira] [Created] (FLINK-24284) Add a greeter and a showcase for the JavaScript SDK

2021-09-14 Thread Igal Shilman (Jira)
Igal Shilman created FLINK-24284:


 Summary: Add a greeter and a showcase for the JavaScript SDK
 Key: FLINK-24284
 URL: https://issues.apache.org/jira/browse/FLINK-24284
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Reporter: Igal Shilman
Assignee: Igal Shilman


We need to add a greeter and a showcase for the Javascript SDK to the 
playground.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24283) Pulsar connector won't use given hash ranges in Key_Shared mode

2021-09-14 Thread Yufan Sheng (Jira)
Yufan Sheng created FLINK-24283:
---

 Summary: Pulsar connector won't use given hash ranges in 
Key_Shared mode
 Key: FLINK-24283
 URL: https://issues.apache.org/jira/browse/FLINK-24283
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.14.0
Reporter: Yufan Sheng
 Fix For: 1.14.0


Pulsar broker will keep the old consumer select if the consumer has been 
closed. This would lead to the sticky key range won't take effect.

We should use a sticky hash range when seeking the initial position in the 
source enumerator.

https://github.com/apache/pulsar/pull/12035



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24282) KafkaRecordSerializationSchema TopicSelector is not serializable

2021-09-14 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24282:
---

 Summary: KafkaRecordSerializationSchema TopicSelector is not 
serializable
 Key: FLINK-24282
 URL: https://issues.apache.org/jira/browse/FLINK-24282
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.0
Reporter: Fabian Paul


To dynamically calculate the outgoing topic we allow passing a lambda. 
Unfortunately, it is currently not marked as serializable hence the following 
code fails in during closure cleaning when used within a job.
 
{code:java}
KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(serSchema)
.setPartitioner(partitioner)
.build())
{code}

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Contribute to Flink StateFun

2021-09-14 Thread Igal Shilman
Hi Evans,
Welcome aboard :)  I'll definitely keep that in mind!
Meanwhile I would also like to point out that Statefun is built using
Apache Flink, and any improvement in Flink, directly contributes to
StateFun,
so I would definitely encourage you to consider contributing to Flink
as-well, I'm positive that the greater Flink dev community will appreciate
that.

Cheers,
Igal.


On Tue, Sep 14, 2021 at 12:59 PM Evans Ye  wrote:

> Hi team,
>
> I'm Evans Ye and I'd like to contribute more to the Flink StateFun project.
> Previously Gordon gave me a hand on mentoring me thus I had a lot of fun
> contributing to the multi-lang E2E tests framework.
> I know that the project PMC got a direction of the project, so it would be
> great if the team can share some JIRAs that's on the priority list and are
> open up for contributions.
> Thanks.
>
> - Evans
>


[jira] [Created] (FLINK-24281) Migrate all existing tests to new Kafka Sink

2021-09-14 Thread Fabian Paul (Jira)
Fabian Paul created FLINK-24281:
---

 Summary: Migrate all existing tests to new Kafka Sink
 Key: FLINK-24281
 URL: https://issues.apache.org/jira/browse/FLINK-24281
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.0, 1.15.0
Reporter: Fabian Paul


The FlinkKafkaProducer is deprecated since 1.14 but a lot of existing tests are 
still using.

We should replace it with the KafkaSink because it completely subsumes it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24280) Support manual checkpoints triggering from a MiniCluster

2021-09-14 Thread Dawid Wysakowicz (Jira)
Dawid Wysakowicz created FLINK-24280:


 Summary: Support manual checkpoints triggering from a MiniCluster
 Key: FLINK-24280
 URL: https://issues.apache.org/jira/browse/FLINK-24280
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Checkpointing, Runtime / Coordination
Reporter: Dawid Wysakowicz
 Fix For: 1.15.0


The goal is to be able to trigger checkpoints manually at a desired time. The 
intention is to use it in tests. We do not want to make this a user-facing 
feature.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Contribute to Flink StateFun

2021-09-14 Thread Evans Ye
Hi team,

I'm Evans Ye and I'd like to contribute more to the Flink StateFun project.
Previously Gordon gave me a hand on mentoring me thus I had a lot of fun
contributing to the multi-lang E2E tests framework.
I know that the project PMC got a direction of the project, so it would be
great if the team can share some JIRAs that's on the priority list and are
open up for contributions.
Thanks.

- Evans


[jira] [Created] (FLINK-24279) Support withBroadcast in DataStream API

2021-09-14 Thread ZHANG ZHIPENG (Jira)
ZHANG ZHIPENG created FLINK-24279:
-

 Summary: Support withBroadcast in DataStream API
 Key: FLINK-24279
 URL: https://issues.apache.org/jira/browse/FLINK-24279
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Reporter: ZHANG ZHIPENG


When doing machine learning using DataStream API, we found that DataStream 
lacks withBroadcast() function, which could be useful in machine learning.

 

A DataSet-based demo is like:

 

DataSet d1 = ...;

DataSet d2 = ...;

d1.map(new RichMapFunction () {
      @Override
      public Object map(Object aLong) throws Exception {
           List elements = getRuntimeContext().getBroadcastVariable("d1");

           ...
      }
}).withBroadcastSet(d2, "d2")

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24278) [FLIP-171] Async Sink Base Sink Developer Guide for Documentation

2021-09-14 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24278:
--

 Summary: [FLIP-171] Async Sink Base Sink Developer Guide for 
Documentation
 Key: FLINK-24278
 URL: https://issues.apache.org/jira/browse/FLINK-24278
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


*User stories*
 * As a Sink user, I’d like to configure the batch size for items to send to 
the destination at once (e.g. “flush if there are x number of items in the 
batch”)
 * As a Sink user, I’d like to configure the batching logic so that I can flush 
the batch of requests based on time period (e.g. “flush every 2 seconds”)
 * As a Sink user I’d like to specify the number of bytes for the batch of 
requests to be flushed (e.g. ”submit the batch after the total number of bytes 
in it is above 1KB”)
 * As a Sink developer, I’d like to use the configuration mechanism provided to 
allow Sink users to configure my Sink implementation
 * 

*Scope*
 * Allow Sink developers and users to pass batch size config to the 
AsyncSinkWriter
 * Add support for time-based flushing (e.g. “flush after x miliseconds”) using 
the ProcessingTimeService which is part of the Sink interface
 * Add support for byte-based flushing
 * Consider the combination of time-based flushing and byte-based flushing, if 
there are more bytes than configured in the time-based batch, then the last few 
(however many necessary) items should go in the next batch to satisfy the 
requirement for the number of bytes.

*References*

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


退订

2021-09-14 Thread pang pan



[jira] [Created] (FLINK-24277) Offset commit should be disabled if consumer group ID is not specified in KafkaSource

2021-09-14 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-24277:
-

 Summary: Offset commit should be disabled if consumer group ID is 
not specified in KafkaSource
 Key: FLINK-24277
 URL: https://issues.apache.org/jira/browse/FLINK-24277
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.14.0
Reporter: Qingsheng Ren
 Fix For: 1.14.0


FLINK-24051 made "group.id" an optional configuration in KafkaSource. However, 
KafkaSource will generate a random group id if user doesn't specify one, and 
this random ID is inconsistent after failover, and not even human readable.

A solution will be adding a configuration for offset commit on checkpoint, make 
it as true by default, and disable offset commit if group id is not specified. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)