StreamingFileSink bulk formats - small files

2022-03-03 Thread Kamil ty
Hello all,

In multiple jobs I'm saving data using the datastream API with
StreamingFileSink and various bulk formats (avro, parquet). As bulk formats
require a rolling policy that extends the CheckpointRollingPolicy I have
created a policy that rolls on file size additionally. Unfortunately for
some flows the data velocity is not that big and even with 128MB max file
size and 6 hour checkpoint interval I end up with small files.

I have seen that there are some open tickets for this, but my question is
if there are any solutions/workarounds for this in flink 1.13.2?

Best regards
Kamil


Re: Example with JSONKeyValueDeserializationSchema?

2022-03-03 Thread Kamil ty
Hello,

Sorry for the late reply. I have checked the issue and it seems to be a
type issue as the exception suggests. What happens is that the
JSONKeyValueDeserializationSchema included in flink implements a
KafkaDeserializationSchema. The .setDeserializer method expects a
Deserialization schema though. The JSONKeyValueDeserializationSchema might
have been left from an older flink version.

My recommendation would be to implement your own
JSONKeyValueDeserializationSchema that implements a Deserialization schema.
You even should be able to copy the implementation from the flink included
JSONKeyValueDeserializationSchema and change KafkaDeserializationSchema to
DeserializationSchema.

If you will have issues with implementing this, please let me know and I
will provide you with the code.

Best regards
Kamil

On Mon, 7 Feb 2022, 15:15 HG,  wrote:

> Hello Kamil et all,
>
> When I build this code:
>
> KafkaSource source = KafkaSource.builder()
> .setProperties(kafkaProps)
> .setProperty("ssl.truststore.type",trustStoreType)
> .setProperty("ssl.truststore.password",trustStorePassword)
> .setProperty("ssl.truststore.location",trustStoreLocation)
> .setProperty("security.protocol",securityProtocol)
> .setProperty("partition.discovery.interval.ms", 
> partitionDiscoveryIntervalMs)
> .setProperty("commit.offsets.on.checkpoint", 
> commitOffsetsOnCheckpoint)
> .setGroupId(groupId)
> .setTopics(kafkaInputTopic)
> .setDeserializer(new JSONKeyValueDeserializationSchema(false))
> 
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .build();
>
>
> I get:
> This error:
>
> error: incompatible types:
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
> cannot be converted to
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema
> .setDeserializer(new
> JSONKeyValueDeserializationSchema(false))
>
>
> What am I doing wrong?
> As per the documentation JSONKeyValueDeserializationSchema returns an
> ObjectNode.
>
> Regards Hans-Peter
>
>
>
> Op vr 14 jan. 2022 om 20:32 schreef Kamil ty :
>
>> Hello Hans,
>>
>> As far as I know the JSONKeyValueDeserializationSchema returns a Jackson
>> ObjectNode. Below I have included an example based on Flink stable
>> documentation.
>>
>> KafkaSource source = KafkaSource.builder()
>> .setBootstrapServers(brokers)
>> .setTopics("input-topic")
>> .setGroupId("my-group")
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setDeserializer(new JSONKeyValueDeserializationSchema(false))
>> .build();
>>
>> DataStream ds = env.fromSource(source,
>> WatermarkStrategy.noWatermarks(), "Kafka Source");
>> // Below we access the JSON field stored in the
>> ObjectNode.
>> DataStream processedDs = ds.map(record ->
>> record.get("value").get("my-field").asText());
>>
>> It is also possible to implement your own deserialization schema that for
>> eg. could turn each record into a POJO. You can do this by implementing the 
>> KafkaDeserializationSchema
>> (Flink : 1.14-SNAPSHOT API) (apache.org)
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.html>.
>> If you are only interested in the value of the Kafka record, you can also
>> extend the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API)
>> (apache.org)
>> <https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.html>
>>  and
>> use .setValueOnlyDeserializer(new CustomDeserializer()). There is also a
>> different API that you could use for this which is specified here: 
>> KafkaSourceBuilder
>> (Flink : 1.14-SNAPSHOT API) (apache.org)
>> <https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html>.
>> Although the customDeserializer will be the same for older Flink versions,
>> the Kafka Source has appeared recently, to learn about the previous kafka
>> source (FlinkKafkaConsumer) see: Kafka | Apache Flink
>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-sourcefunction>
>> .
>>
>> Best Regards
>> Kamil
>>
>> On Fri, 14 Jan 2022 at 18:37, HG  wrote:
>>
>>> Hi,
>>>
>>> Before starting programming myself I'd like to know whether there are
>>> good examples with deserialization of JSON that I can borrow.
>>> The structure of the JSON is nested with multiple levels.
>>>
>>> Any references?
>>>
>>> 'better well stolen than badly invented myself' we'd say in Dutch😁
>>>
>>> Regards Hans
>>>
>>


Re: Example with JSONKeyValueDeserializationSchema?

2022-01-14 Thread Kamil ty
Hello Hans,

As far as I know the JSONKeyValueDeserializationSchema returns a Jackson
ObjectNode. Below I have included an example based on Flink stable
documentation.

KafkaSource source = KafkaSource.builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new JSONKeyValueDeserializationSchema(false))
.build();

DataStream ds = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka Source");
// Below we access the JSON field stored in the ObjectNode.
DataStream processedDs = ds.map(record ->
record.get("value").get("my-field").asText());

It is also possible to implement your own deserialization schema that for
eg. could turn each record into a POJO. You can do this by
implementing the KafkaDeserializationSchema
(Flink : 1.14-SNAPSHOT API) (apache.org)
.
If you are only interested in the value of the Kafka record, you can also
extend the AbstractDeserializationSchema (Flink : 1.14-SNAPSHOT API)
(apache.org)

and
use .setValueOnlyDeserializer(new CustomDeserializer()). There is also a
different API that you could use for this which is specified here:
KafkaSourceBuilder
(Flink : 1.14-SNAPSHOT API) (apache.org)
.
Although the customDeserializer will be the same for older Flink versions,
the Kafka Source has appeared recently, to learn about the previous kafka
source (FlinkKafkaConsumer) see: Kafka | Apache Flink

.

Best Regards
Kamil

On Fri, 14 Jan 2022 at 18:37, HG  wrote:

> Hi,
>
> Before starting programming myself I'd like to know whether there are good
> examples with deserialization of JSON that I can borrow.
> The structure of the JSON is nested with multiple levels.
>
> Any references?
>
> 'better well stolen than badly invented myself' we'd say in Dutch😁
>
> Regards Hans
>


Flink per-job cluster HbaseSinkFunction fails before starting - Configuration issue

2022-01-14 Thread Kamil ty
Hello all,
I have a flink job that is using the HbaseSinkFunction as specified
here: flink/flink-connectors/flink-connector-hbase-2.2
at master · a0x8o/flink (github.com)


I'm deploying the job to a cluster in yarn per-job mode. Using flink run -d
job.jar.

The job gets accepted and I get the address of the UI but when looking at
the UI the job stays at CREATED and never actually runs. After some time it
stops.

This error stands out when looking at the logs:
WARN [main] org.apache.hadoop.security.LdapGroupsMapping: Exception while
trying to get password for alias
hadoop.security.group.mapping.ldap.bind.password:
java.io.IOException: Configuration problem with provider path.
at
org.apache.hadoop.conf.Configuration.getPasswordFromCredentialProviders(Configuration.java:2428)
at
org.apache.hadoop.conf.Configuration.getPassword(Configuration.java:2347)
at
org.apache.hadoop.security.LdapGroupsMapping.getPassword(LdapGroupsMapping.java:797)
at
org.apache.hadoop.security.LdapGroupsMapping.setConf(LdapGroupsMapping.java:680)
at
org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:77)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137)
at org.apache.hadoop.security.Groups.(Groups.java:105)
at org.apache.hadoop.security.Groups.(Groups.java:101)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:476)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:352)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:314)
at
org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1996)
at
org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:743)
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:693)
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:604)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.main(ContainerLocalizer.java:468)
Caused by: java.nio.file.AccessDeniedException:
/var/run/.../process/1546359139-yarn-NODEMANAGER/creds.localjceks
at
sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at
java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at
org.apache.hadoop.security.alias.LocalKeyStoreProvider.getInputStreamForFile(LocalKeyStoreProvider.java:76)
at
org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider.locateKeystore(AbstractJavaKeyStoreProvider.java:325)
at
org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider.(AbstractJavaKeyStoreProvider.java:86)
at
org.apache.hadoop.security.alias.LocalKeyStoreProvider.(LocalKeyStoreProvider.java:56)
at
org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider.(LocalJavaKeyStoreProvider.java:42)
at
org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider.(LocalJavaKeyStoreProvider.java:34)
at
org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider$Factory.createProvider(LocalJavaKeyStoreProvider.java:68)
at
org.apache.hadoop.security.alias.CredentialProviderFactory.getProviders(CredentialProviderFactory.java:73)
at
org.apache.hadoop.conf.Configuration.getPasswordFromCredentialProviders(Configuration.java:2409)
... 15 more

This seems as if it tries to access by password based authentication but on
the cluster only Kerberos based authentication should be used.

The log output when scheduling the job might be a clue:
org.apache.flink.yarn.Utils [] - Attempting to obtain Kerberos
security token for HBase
org.apache.flink.yarn.Utils [] - Hbase is not available (not
packaged with this application): ClassNotFoundException :
"org.apache.hadoop.hbase.HbaseConfiguration".

The flink-connector-hbase-2.2 dependecy has been specified in the compile
scope and the hadoop-common in the provided scope in the jobs' pom.xml. I
have also tried including more dependencies as hbase-common, hbase-client
without any luck.
I have also tried setting the HBASE_HOME, HBASE_CONF_DIR, HADOOP_CLASSPATH
and HBASE_CLASSPATH environment variables.
I have also made sure that the host where the job is dep

Re: Running a flink job with Yarn per-job mode from application code.

2021-12-09 Thread Kamil ty
Sorry for that. Will remember to add the mailing list in responses.

The REST API approach will probably be sufficient. One more question
regarding this. Is it possible to get the address/port of the rest api
endpoint from the job? I see that when deploying a job to yarn the
rest.address and rest.port keys are not set inside the configuration
obtained with env.getConfiguration().toString();.

On Thu, 9 Dec 2021 at 03:29, Caizhi Weng  wrote:

> Hi!
>
> Please make sure to always reply to the user mailing list so that everyone
> can see the discussion.
>
> You can't get the execution environment for an already running job but if
> you want to operate on that job you can try to get its JobClient instead.
> However this is somewhat complicated to get with Java code. If you're
> interested see ClusterClientJobClientAdapter and its usage.
>
> I would also recommend using REST API to operate on an existing job. See
> [1] for the APIs.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
>
> Kamil ty  于2021年12月8日周三 21:15写道:
>
>> Thank you. I guess I will try this solution. One thing I'm unsure about
>> is if I will be able to get the execution environment in a function from an
>> already running job. Will get back with an response if this works.
>>
>> Best regards
>> Kamil
>>
>> On Wed, 8 Dec 2021, 03:20 Caizhi Weng,  wrote:
>>
>>> Hi!
>>>
>>> So you would like to submit a yarn job with Java code, not using
>>> /bin/flink run?
>>>
>>> If that is the case, you'll need to set 'execution.target' config option
>>> to 'yarn-per-job'. Set this in the configuration of ExecutionEnvironment
>>> and execute the job with Flink API as normal.
>>>
>>> Kamil ty  于2021年12月7日周二 19:16写道:
>>>
>>>> Hello all,
>>>>
>>>> I'm looking for a way to submit a Yarn job from another flink jobs
>>>> application code. I can see that you can access a cluster and submit jobs
>>>> with a RestClusterClient, but it seems a Yarn per-job mode is not supported
>>>> with it.
>>>>
>>>> Any suggestions would be appreciated.
>>>>
>>>> Best Regards
>>>> Kamil
>>>>
>>>


Running a flink job with Yarn per-job mode from application code.

2021-12-07 Thread Kamil ty
Hello all,

I'm looking for a way to submit a Yarn job from another flink jobs
application code. I can see that you can access a cluster and submit jobs
with a RestClusterClient, but it seems a Yarn per-job mode is not supported
with it.

Any suggestions would be appreciated.

Best Regards
Kamil


Pyflink/Flink Java parquet streaming file sink for a dynamic schema stream

2021-12-02 Thread Kamil ty
Hello,

I'm wondering if there is a possibility to create a parquet streaming file
sink in Pyflink (in Table API) or in Java Flink (in Datastream api).

To give an example of the expected behaviour. Each element of the stream is
going to contain a json string. I want to save this stream to parquet files
without having to explicitly define the schema/types of the messages (also
using a single sink).

If this is possible, (might be in Java Flink using a custom
ParquetBulkWriterFactory etc.) any direction for the implementation would
be appreciated.

Best regards
Kamil


Pyflink 1.13.2 convert datastream into table BIG_INT type

2021-11-28 Thread Kamil ty
Hello I'm trying to convert a datastream into a table using:
table_env.from_datastream(ds)

The ds contains some fields with the Types.BIG_INT() type. Those fields
seem to be converted to: RAW('java.math.BigInteger', '...'). This is seen
as an error by flink which is resulting in table query and sink schema
mismatch (incompatible types) (BIGINT in the sink schema and
RAW('java.math.BigInteger','...') in the query.)

Any help with this would be appreciated.

Best Regards
Kamil


Re: Flink CLI - pass command line arguments to a pyflink job

2021-11-23 Thread Kamil ty
Thank you Matthias and Dian!

I have verified this command:
bin/flink run -py  examples/python/table/batch/word_count.py --test "Hello
World"
Where the "--test" argument is accessed from the python code, and the
arguments work as expected.

Best regards
Kamil

On Tue, 23 Nov 2021 at 02:48, Dian Fu  wrote:

> Hi Kamil,
>
> It's documented at the end of the page:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/#submitting-pyflink-jobs
> .
>
> Regards,
> Dian
>
> On Tue, Nov 23, 2021 at 12:10 AM Matthias Pohl 
> wrote:
>
>> Hi Kamil,
>> afaik, the parameter passing should work as normal by just appending them
>> to the Flink job submission similar to the Java job submission:
>> ```
>> $ ./flink run --help
>> Action "run" compiles and runs a program.
>>   Syntax: run [OPTIONS]  
>> [...]
>> ```
>>
>> Matthias
>>
>> On Mon, Nov 22, 2021 at 3:58 PM Kamil ty  wrote:
>>
>>> Hey,
>>>
>>> Looking at the examples at Command-Line Interface | Apache Flink
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/cli/>
>>>  I
>>> don't see an example of passing command line arguments to a pyflink job
>>> when deploying the job to a remote cluster with flink cli. Is this
>>> supported?
>>>
>>> Best Regards
>>> Kamil
>>>
>>


Re: Table API Filesystem connector - disable interval rolling policy

2021-11-22 Thread Kamil ty
Hi,

The end result that I'm expecting is to only have a few files that are
filled up to a specific file size and not multiple small files.

So if I understand you correctly, when I enable the auto compaction, all
the small files that are being created will be merged into files that are
up to the specified size?

eg. (with the default 128MB max filesize)
Without autocompaction:
output/
file1.avro - 28MB
file2.avro - 100MB
file3.avro - 30MB

With autocompaction:
output/
file1.avro - 128MB
file2.avro - 30MB

Am I understanding this correctly? If so, I think autocompaction is the
exact solution to this issue.

Best Regards
Kamil

On Mon, 22 Nov 2021 at 16:43, Francesco Guardiani 
wrote:

> Hi,
> Looking at the code, there is no ability to disable the rollover-interval.
>
> But I'm wondering, what are you trying to do? Write a file up to the
> configured file-size? Note that if you're using auto compaction, on every
> checkpoint you'll have a rollover, regardless of the rollover-interval.
>
> I cc'ed Fabian in the discussion which has a better knowledge than me on
> file sink relates topics.
>
> FG
>
> On Mon, Nov 22, 2021 at 3:51 PM Matthias Pohl 
> wrote:
>
>> Hi Kamil,
>> by looking at the code I'd say that the only option you have is to
>> increase the parameter you already mentioned to a very high number. But I'm
>> not sure about the side effects. I'm gonna add Francesco to this thread.
>> Maybe he has better ideas on how to answer your question.
>>
>> Best,
>> Matthias
>>
>> On Mon, Nov 22, 2021 at 10:32 AM Kamil ty  wrote:
>>
>>> Hey all,
>>>
>>> I wanted to know if there is a way to disable the interval rolling
>>> policy in the Table API filesystem connector.
>>> From flink docs: FileSystem | Apache Flink
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#rolling-policy>
>>> The key to change the interval: sink.rolling-policy.rollover-interval
>>> Is it possible to fully disable this rolling policy or the only solution
>>> is to set a very big duration?
>>>
>>> Best Regards
>>> Kamil
>>>
>>


Flink CLI - pass command line arguments to a pyflink job

2021-11-22 Thread Kamil ty
Hey,

Looking at the examples at Command-Line Interface | Apache Flink

I
don't see an example of passing command line arguments to a pyflink job
when deploying the job to a remote cluster with flink cli. Is this
supported?

Best Regards
Kamil


Table API Filesystem connector - disable interval rolling policy

2021-11-22 Thread Kamil ty
Hey all,

I wanted to know if there is a way to disable the interval rolling policy
in the Table API filesystem connector.
>From flink docs: FileSystem | Apache Flink

The key to change the interval: sink.rolling-policy.rollover-interval
Is it possible to fully disable this rolling policy or the only solution is
to set a very big duration?

Best Regards
Kamil


Re: Deserialize generic kafka json message in pyflink. Single kafka topic, multiple message schemas (debezium).

2021-11-22 Thread Kamil ty
Hey,
This is a great solution for now, thanks. In the end I decided to use the
Table API and the RAW format as I needed access to the kafka event
timestamp.

Thanks a lot.

Best Regards
Kamil

On Mon, 22 Nov 2021 at 02:31, Dian Fu  wrote:

> Hi Kamil,
>
> Actually FlinkKafkaConsumer expects a DeserializationSchema instead of
> JsonRowDeserialization and so I guess you could try SimpleStringSchema.
>
> Regards,
> Dian
>
> On Sat, Nov 20, 2021 at 5:55 AM Kamil ty  wrote:
>
>> Hello all,
>>
>> I'm working on a pyflink job that's supposed to consume json messages
>> from Kafka and save them to a partitioned avro file sink.
>> I'm having difficulties finding a solution on how to process the
>> messages, because there is only one kafka topic for multiple
>> message schemas. As pyflinks FlinkKafkaConsumer expects a
>> JsonRowDeserialization schema, I assume that all of the messages need a
>> constant defined schema. I expect the same for the Kafka Table API.
>>
>> The messages follow a general debezium message schema:
>> Example data taken from flink docs:
>>
>> {
>>   "schema": {...},
>>   "payload": {
>> "before": {
>>   "id": 111,
>>   "name": "scooter",
>>   "description": "Big 2-wheel scooter",
>>   "weight": 5.18
>> },
>> "after": {
>>   "id": 111,
>>   "name": "scooter",
>>   "description": "Big 2-wheel scooter",
>>   "weight": 5.15
>> },
>> "source": {...},
>> "op": "u",
>> "ts_ms": 1589362330904,
>> "transaction": null
>>   }}
>>
>> The messages are coming to a single Kafka topic, where the 'schema',
>> 'after', 'before' fields can be different for each message. The kafka
>> message key also contains the 'schema' field from the above example. My
>> question is if there is a way to process such messages coming from a single
>> Kafka topic with pyflink without writing a custom DeserializationSchema.
>> Any help would be appreciated.
>>
>> Kind Regards
>> Kamil
>>
>


Deserialize generic kafka json message in pyflink. Single kafka topic, multiple message schemas (debezium).

2021-11-19 Thread Kamil ty
Hello all,

I'm working on a pyflink job that's supposed to consume json messages from
Kafka and save them to a partitioned avro file sink.
I'm having difficulties finding a solution on how to process the
messages, because there is only one kafka topic for multiple
message schemas. As pyflinks FlinkKafkaConsumer expects a
JsonRowDeserialization schema, I assume that all of the messages need a
constant defined schema. I expect the same for the Kafka Table API.

The messages follow a general debezium message schema:
Example data taken from flink docs:

{
  "schema": {...},
  "payload": {
"before": {
  "id": 111,
  "name": "scooter",
  "description": "Big 2-wheel scooter",
  "weight": 5.18
},
"after": {
  "id": 111,
  "name": "scooter",
  "description": "Big 2-wheel scooter",
  "weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
  }}

The messages are coming to a single Kafka topic, where the 'schema',
'after', 'before' fields can be different for each message. The kafka
message key also contains the 'schema' field from the above example. My
question is if there is a way to process such messages coming from a single
Kafka topic with pyflink without writing a custom DeserializationSchema.
Any help would be appreciated.

Kind Regards
Kamil


Re: Pyflink PyPi build - scala 2.12 compatibility

2021-11-10 Thread Kamil ty
Thank you for the clarification. This was very helpful!

Kind regards
Kamil

On Wed, 10 Nov 2021, 02:26 Dian Fu,  wrote:

> Hi Kamil,
>
> You are right that it comes with JAR packages of scala 2.11 in the PyFlink
> package as it has to select one version of JARs to bundle, either 2.11 or
> 2.12. Whether it works with scala 2.12 depends on how you submit your job.
> - If you execute the job locally, then it will use the JARs bundled in the
> PyFlink installation by default, that's scala 2.11. However, you could set
> the environment variable 'FLINK_HOME' [1] to the directory of a custom
> Flink distribution of 2.12 if you want to work with scala 2.12.
> - If you execute the job remotely, e.g using `flink run` to submit the job
> to a remote session cluster, YARN cluster, etc. Then it depends on the
> Flink distribution from which the `flink run` command refers to. If you
> want to work with scala 2.12, it should refer to a custom Flink
> distribution of 2.12.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-python/pyflink/find_flink_home.py#L46
>
> On Wed, Nov 10, 2021 at 3:12 AM Kamil ty  wrote:
>
>> Hello,
>>
>> Just wanted to verify if the default build of pyflink available from PyPi
>> is compatible with flink - scala version 2.12. I have noticed that the PyPi
>> pyflink version comes with apache-flink-libraries targeted for scala 2.11
>> only and I was wondering if this might be the cause of some issues that I'm
>> running into.
>>
>> Kind regards
>> Kamil
>>
>


Pyflink PyPi build - scala 2.12 compatibility

2021-11-09 Thread Kamil ty
Hello,

Just wanted to verify if the default build of pyflink available from PyPi
is compatible with flink - scala version 2.12. I have noticed that the PyPi
pyflink version comes with apache-flink-libraries targeted for scala 2.11
only and I was wondering if this might be the cause of some issues that I'm
running into.

Kind regards
Kamil


Pyflink data stream API to Table API conversion with multiple sinks.

2021-10-08 Thread Kamil ty
Hello,
In my pyflink job I have such flow:

1. Use table API to get messages from Kafka
2. Convert the table to a data stream
3. Convert the data stream back to the table API
4. Use a statement set to write the data to two filesystem sinks (avro and
parquet)

I'm able to run the job and everything seems to be working but the files
are not filling with data and are stuck in progress.

I'm suspecting that I'm doing something wrong with how i run .execute().

Currently at the end of my script I use:
statement_set.execute()
streaming_environment.execute("My job")

My question is what would be the correct way to run a job with the flow
specified. I can share the code if needed.

I would appreciate any help.

Kind regards
Kamil


Re: Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-08 Thread Kamil ty
Hey you are right. The issue was with Flink and pyflink version mismatch.
It turned out Flink 1.12 was installed on the cluster. Downgrading pyflink
from 1.12.3 to 1.12 fixed the issue.

Thank you for your help.

On Fri, 8 Oct 2021, 04:04 Dian Fu,  wrote:

> Hi Kamil,
>
> I have checked that this method exists in 1.12.3:
> https://github.com/apache/flink/blob/release-1.12.3/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java#L137
>
> Could you double check whether the Flink version is 1.12.3 (not just the
> PyFlink version)?
>
> Regards,
> Dian
>
>
>
> On Tue, Oct 5, 2021 at 11:34 PM Nicolaus Weidner <
> nicolaus.weid...@ververica.com> wrote:
>
>> Hi Kamil,
>>
>> On Tue, Oct 5, 2021 at 9:03 AM Kamil ty  wrote:
>>
>>> Hello,
>>>
>>> I'm trying to run a pyflink job in cluster mode (with yarn). My job
>>> contains source and sink definitions using Table API which are converted to
>>> a datastream and back. Unfortunately I'm getting an unusual exception at:
>>> *table = t_env.from_data_stream(ds, 'user_id, first_name, last_name).*
>>>
>>
>> Just to make sure: Is the missing quotation mark just a typo in your
>> mail, or your code (right before the closing bracket)?
>> *table = t_env.from_data_stream(ds, 'user_id, first_name, last_name['])*
>>
>> Best regards,
>> Nico
>>
>


Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-05 Thread Kamil ty
Hello,

I'm trying to run a pyflink job in cluster mode (with yarn). My job
contains source and sink definitions using Table API which are converted to
a datastream and back. Unfortunately I'm getting an unusual exception at:
*table = t_env.from_data_stream(ds, 'user_id, first_name, last_name).*

The exception is:
*Traceback (most recent call last):*
*  File "users_job.py", line 40, in *
*table = t_env.from_data_stream(ds, 'user_id, first_name, last_name)*
*  File
"/jobs/venv/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 1734, in from_data_stream*
*JPythonConfigUtil.declareManagedMemory(*
*  File "/jobs/venv/lib/python3.7/site-packages/py4j/java_gateway.py", line
1516, in __getattr__*
*"{0}.{1} does not exist in the JVM".format(self._fqn, name))*
*py4j.protocol.Py4JError:
org.apache.flink.python.util.PythonConfigUtil.declareManagedMemory does not
exist in the JVM*

Python version: 3.7 (venv built by the setup-python-environment.sh script
from documentation)
Flink version: 1.12.3

Any help would be appreciated.

Kind Regards
Kamil


Flink CEP in PyFlink

2021-09-07 Thread Kamil ty
Hello all,

I would like to use Flink CEP for my development requirements. Is Flink CEP
supported in PyFlink? If not, are there any available workarounds?

Kind regards
Kamil


PyFlink StreamingFileSink bulk-encoded format (Avro)

2021-08-17 Thread Kamil ty
Hello,

I'm trying to save my data stream to an Avro file on HDFS. In Flink
documentation I can only see explanations for Java/Scala. However, I can't
seem to find a way to do it in PyFlink. Is this possible to do in PyFlink
currently?

Kind Regards
Kamil