Re: Question: Determining Total Recovery Time

2020-02-10 Thread Arvid Heise
Hi Morgan,

as Timo pointed out, there is no general solution, but in your setting, you
could look at the consumer lag of the input topic after a crash. Lag would
spike until all tasks restarted and reprocessing begins. Offsets are only
committed on checkpoints though by default.

Best,

Arvid

On Tue, Feb 4, 2020 at 12:32 PM Timo Walther  wrote:

> Hi Morgan,
>
> as far as I know this is not possible mostly because measuring "till the
> point when the system catches up to the last message" is very
> pipeline/connector dependent. Some sources might need to read from the
> very beginning, some just continue from the latest checkpointed offset.
>
> Measure things like that (e.g. for experiments) might require collecting
> own metrics as part of your pipeline definition.
>
> Regards,
> Timo
>
>
> On 03.02.20 12:20, Morgan Geldenhuys wrote:
> > Community,
> >
> > I am interested in determining the total time to recover for a Flink
> > application after experiencing a partial failure. Let's assume a
> > pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once
> > guarantees enabled.
> >
> > Taking a look at the documentation
> > (
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html),
>
> > one of the metrics which can be gathered is /recoveryTime/. However, as
> > far as I can tell this is only the time taken for the system to go from
> > an inconsistent state back into a consistent state, i.e. restarting the
> > job. Is there any way of measuring the amount of time taken from the
> > point when the failure occurred till the point when the system catches
> > up to the last message that was processed before the outage?
> >
> > Thank you very much in advance!
> >
> > Regards,
> > Morgan.
>
>


Batch reading from Cassandra. How to?

2020-02-10 Thread Lasse Nedergaard
Hi.

We would like to do some batch analytics on our data set stored in
Cassandra and are looking for an efficient way to load data from a single
table. Not by key, but random 15%, 50% or 100%
Data bricks has create an efficient way to load Cassandra data into Apache
Spark and they are doing it by reading from the underlying SS tables to
load in parallel.
Do we have something similarly in Flink, or how is the most efficient way
to load all, or many random data from a single Cassandra table into Flink?

Any suggestions and/or recommendations is highly appreciated.

Thanks in advance

Lasse Nedergaard


Re: SSL configuration - default behaviour

2020-02-10 Thread Krzysztof Chmielewski
Thanks Robert,
just a small suggestion maybe to change the documentation a little bit.

I'm not sure if its only my impression but from sentence:
*" All internal connections are SSL authenticated and encrypted"* initially
I thought that this is the default configuration.

Thanks,
Krzysztof

pon., 10 lut 2020 o 15:12 Robert Metzger  napisał(a):

> Hi,
>
> thanks a lot for your message. By default, internal connections are not
> encrypted.
>
> On Fri, Feb 7, 2020 at 4:08 PM KristoffSC 
> wrote:
>
>> Hi,
>> In documentation [1] we can read that
>>
>> All internal connections are SSL authenticated and encrypted. The
>> connections use mutual authentication, meaning both server and client side
>> of each connection need to present the certificate to each other. The
>> certificate acts effectively as a shared secret.
>>
>> But is this a default behavior? Are internal connections encrypted by
>> default?
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-10 Thread Dian Fu
+1 (non-binding)

- Verified the signature and checksum
- Pip installed the package successfully: pip install apache-flink-1.9.2.tar.gz
- Run word count example successfully.

Regards,
Dian

> 在 2020年2月11日,上午11:44,jincheng sun  写道:
> 
> 
> +1 (binding) 
> 
> - Install the PyFlink by `pip install` [SUCCESS]
> - Run word_count in both command line and IDE [SUCCESS]
> 
> Best,
> Jincheng
> 
> 
> 
> Wei Zhong mailto:weizhong0...@gmail.com>> 
> 于2020年2月11日周二 上午11:17写道:
> Hi,
> 
> Thanks for driving this, Jincheng.
> 
> +1 (non-binding) 
> 
> - Verified signatures and checksums.
> - Verified README.md and setup.py.
> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and Python 
> 3.7.5 successfully.
> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via 
> `pyflink-shell.sh local` and try the examples in the help message, run well 
> and no exception.
> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5, run 
> well and no exception.
> 
> Best,
> Wei
> 
> 
>> 在 2020年2月10日,19:12,jincheng sun > > 写道:
>> 
>> Hi everyone,
>> 
>> Please review and vote on the release candidate #1 for the PyFlink version 
>> 1.9.2, as follows:
>> 
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>> 
>> The complete staging area is available for your review, which includes:
>> 
>> * the official Apache binary convenience releases to be deployed to 
>> dist.apache.org  [1], which are signed with the key 
>> with fingerprint 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from 
>> source code [3].
>> 
>> The vote will be open for at least 72 hours. It is adopted by majority 
>> approval, with at least 3 PMC affirmative votes.
>> 
>> Thanks,
>> Jincheng
>> 
>> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/ 
>> 
>> [2] https://dist.apache.org/repos/dist/release/flink/KEYS 
>> 
>> [3] https://github.com/apache/flink/tree/release-1.9.2 
>> 


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-10 Thread jincheng sun
+1 (binding)

- Install the PyFlink by `pip install` [SUCCESS]
- Run word_count in both command line and IDE [SUCCESS]

Best,
Jincheng



Wei Zhong  于2020年2月11日周二 上午11:17写道:

> Hi,
>
> Thanks for driving this, Jincheng.
>
> +1 (non-binding)
>
> - Verified signatures and checksums.
> - Verified README.md and setup.py.
> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and Python
> 3.7.5 successfully.
> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
> `pyflink-shell.sh local` and try the examples in the help message, run well
> and no exception.
> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5, run
> well and no exception.
>
> Best,
> Wei
>
>
> 在 2020年2月10日,19:12,jincheng sun  写道:
>
> Hi everyone,
>
> Please review and vote on the release candidate #1 for the PyFlink version
> 1.9.2, as follows:
>
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> The complete staging area is available for your review, which includes:
>
> * the official Apache binary convenience releases to be deployed to
> dist.apache.org [1], which are signed with the key with fingerprint
> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source code [3].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Jincheng
>
> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> [3] https://github.com/apache/flink/tree/release-1.9.2
>
>
>


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-10 Thread Wei Zhong
Hi,

Thanks for driving this, Jincheng.

+1 (non-binding) 

- Verified signatures and checksums.
- Verified README.md and setup.py.
- Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and Python 3.7.5 
successfully.
- Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via 
`pyflink-shell.sh local` and try the examples in the help message, run well and 
no exception.
- Try a word count example in IDE with Python 2.7.15 and Python 3.7.5, run well 
and no exception.

Best,
Wei


> 在 2020年2月10日,19:12,jincheng sun  写道:
> 
> Hi everyone,
> 
> Please review and vote on the release candidate #1 for the PyFlink version 
> 1.9.2, as follows:
> 
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
> 
> The complete staging area is available for your review, which includes:
> 
> * the official Apache binary convenience releases to be deployed to 
> dist.apache.org  [1], which are signed with the key 
> with fingerprint 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from 
> source code [3].
> 
> The vote will be open for at least 72 hours. It is adopted by majority 
> approval, with at least 3 PMC affirmative votes.
> 
> Thanks,
> Jincheng
> 
> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/ 
> 
> [2] https://dist.apache.org/repos/dist/release/flink/KEYS 
> 
> [3] https://github.com/apache/flink/tree/release-1.9.2 
> 


Re:Re: Flink connect hive with hadoop HA

2020-02-10 Thread sunfulin
Hi ,guys
Thanks for kind reply. Actually I want to know how to change client side haddop 
conf while using table API within my program. Hope some useful sug.











At 2020-02-11 02:42:31, "Bowen Li"  wrote:

Hi sunfulin,


Sounds like you didn't config the hadoop HA correctly on the client side 
according to [1]. Let us know if it helps resolve the issue.


[1] 
https://stackoverflow.com/questions/25062788/namenode-ha-unknownhostexception-nameservice1









On Mon, Feb 10, 2020 at 7:11 AM Khachatryan Roman  
wrote:

Hi,


Could you please provide a full stacktrace?


Regards,
Roman




On Mon, Feb 10, 2020 at 2:12 PM sunfulin  wrote:

Hi, guys
I am using Flink 1.10 and test functional cases with hive intergration. Hive 
with 1.1.0-cdh5.3.0 and with hadoop HA enabled.Running flink job I can see 
successful connection with hive metastore, but cannot read table data with 
exception:


java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)


I am running a standalone application. Looks like I am missing my hadoop conf 
file in my flink job application classpath. Where should I config ?




 

Re: merge implementation in count distinct

2020-02-10 Thread Jark Wu
Hi Fanbin,

Thanks for reporting this. I think you are right, the implementation is not
correct. I have created a JIRA issue [1] to fix this.
Btw, the CountDistinctWithMerge in blink planner is implemented correctly
[2].

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15979
[2]:
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/utils/JavaUserDefinedAggFunctions.java#L369


On Tue, 11 Feb 2020 at 03:36, Fanbin Bu  wrote:

> Hi,
>
> For the following implementation of merge,
> https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java#L224
>
> what if acc has the some keys in mergeAcc? the merged count would not be
> accurate then. I think the count should be incremented by one in
> https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java#L236
>
>
> Thanks,
> Fanbin
>
>
>


merge implementation in count distinct

2020-02-10 Thread Fanbin Bu
Hi,

For the following implementation of merge,
https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java#L224

what if acc has the some keys in mergeAcc? the merged count would not be
accurate then. I think the count should be incremented by one in
https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java#L236


Thanks,
Fanbin


Re: Flink connect hive with hadoop HA

2020-02-10 Thread Bowen Li
Hi sunfulin,

Sounds like you didn't config the hadoop HA correctly on the client side
according to [1]. Let us know if it helps resolve the issue.

[1]
https://stackoverflow.com/questions/25062788/namenode-ha-unknownhostexception-nameservice1




On Mon, Feb 10, 2020 at 7:11 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Could you please provide a full stacktrace?
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 2:12 PM sunfulin  wrote:
>
>> Hi, guys
>> I am using Flink 1.10 and test functional cases with hive intergration.
>> Hive with 1.1.0-cdh5.3.0 and with hadoop HA enabled.Running flink job I can
>> see successful connection with hive metastore, but cannot read table data
>> with exception:
>>
>> java.lang.IllegalArgumentException: java.net.UnknownHostException:
>> nameservice1
>> at
>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
>> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>>
>> I am running a standalone application. Looks like I am missing my hadoop
>> conf file in my flink job application classpath. Where should I config ?
>>
>>
>>
>>
>


Re: Flink Minimal requirements

2020-02-10 Thread Khachatryan Roman
Hi Kristof,

Flink doesn't have any specific requirements.
You can run Flink on a single node with just one core. The number of
threads is dynamic.

However, you'll probably want to configure memory usage if the default
values are greater than what the actual machine has.

Regards,
Roman


On Mon, Feb 10, 2020 at 9:02 AM KristoffSC 
wrote:

> Hi all,
> well this may be a little bit strange question, but are there any minimal
> machine requirements (memory size, CPU etc) and  non functional
> requirements
> (number of nodes, network ports ports, etc) for Flink?
>
> I know it all boils down to what my deployed Job will be, but if we just
> could put this aside for a moment and focus on a bare minimum just for
> Flink.
>
> Probably we can say that Flink requires minim 2 nodes right?
> What about minimal memory needed for Flink runtime. How many threads
> Flink's
> runtime is using.
>
> Any thought about this one?
>
> Thanks,
> Krzysztof
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


FlinkCEP questions - architecture

2020-02-10 Thread Juergen Donnerstag
Hi,

we're in very early stages evaluating options. I'm not a Flink expert, but
did read some of the docs and watched videos. Could you please help me
understand if and how certain of our reqs are covered by Flink (CEP). Is
this mailing list the right channel for such questions?

1) We receive files every day, which are exports from some database tables,
containing ONLY changes from the day. Most tables have modify-cols. Even
though they are files but because they contain changes only, I belief the
file records shall be considered events in Flink terminology. Is that
assumption correct?

2) The records within the DB export files are NOT in chronologically, and
we can not change the export. Our use case is a "complex event processing"
case (FlinkCEP) with rules like "KeyBy(someKey) If first A, then B, then C
within 30 days, then do something". Does that work with FlinkCEP despite
the events/records are not in chrono order within the file? The files are
100MB to 20GB in size. Do I need to sort the files first before CEP
processing?

3) Occassionally some crazy people manually "correct" DB records within the
database and manually trigger a re-export of ALL of the changes for that
respective day (e.g. last weeks Tuesday). Consequently we receive a
correction file. Same filename but "_1" appended. All filenames include the
date (of the original export). What are the options to handle that case
(besides telling the DB admins not to, which we did already). Regular
checkpoints and re-process all files since then?  What happens to the CEP
state? Will it be checkpointed as well?

4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem?

5) We also have CEP rules that must fire if after a start sequence matched,
the remaining sequence did NOT within a configured window. E.g. If A, then
B, but C did not occur within 30 days since A. Is that supported by
FlinkCEP? I couldn't find a working example.

6) We expect 30-40 CEP rules. How can we estimate the required storage size
for the temporary CEP state? Is there some sort of formular considering
number of rules, number of records per file or day, record size, window,
number of records matched per sequence, number of keyBy grouping keys, ...

7) I can imagine that for debugging reasons it'd be good if we were able to
query the temporary CEP state. What is the (CEP) schema used to persist the
CEP state and how can we query it? And does such query work on the whole
cluster or only per node (e.g. because of shuffle and nodes responsible
only for a portion of the events).

8) I understand state is stored per node. What happens if I want to add or
remove a nodes. Will the state still be found, despite it being stored in
another node? I read that I need to be equally careful when changing rules?
Or is that a different issue?

9) How does garbage collection of temp CEP state work, or will it stay
forever?  For tracing/investigation reasons I can imagine that purging it
at the earliest possible time is not always the best option. May be after
30 days later or so.

10) Are there strategies to minimize temp CEP state? In SQL queries you
 filter first on the "smallest" attributes. CEP rules form a sequence.
Hence that approach will not work. Is that an issue at all? What are
practical limits on the CEP temp state storage engine?

11) Occassionally we need to process about 200 files at once. Can I speed
things up by processing all files in parallel on multiple nodes, despite
their sequence (CEP use case)? This would only work if FlinkCEP in step 1
simply filters on all relevant events of a sequence, updates state, and in
a step 2 - after the files are processed - evaluates the updated state if
that meets the sequences.

12) Schema changes in the input files: Occassionly the DB source system
schema is changed, and not always in a backwards compatible way (insert new
fields in the middle), and also the export will have the field in the
middle. This means that starting from a specific (file) date, I need to
consider a different schema. This must also be handled when re-running
files for the last month, because of corrections provided. And if the file
format has changed someone in the middle ...

thanks a lot for your time and your help
Juergen


Re: Exactly once semantics for hdfs sink

2020-02-10 Thread Khachatryan Roman
Hi Vishwas,

Yes, Streaming File Sink does support exactly-once semantics and can be
used with HDFS.

Regards,
Roman


On Mon, Feb 10, 2020 at 5:20 PM Vishwas Siravara  wrote:

> Hi all,
> I want to use the StreamingFile sink for writing data to hdfs. Can I
> achieve exactly once semantics with this sink ?
>
>
> Best,
> HW.
>


Exactly once semantics for hdfs sink

2020-02-10 Thread Vishwas Siravara
Hi all,
I want to use the StreamingFile sink for writing data to hdfs. Can I
achieve exactly once semantics with this sink ?


Best,
HW.


Re: Flink connect hive with hadoop HA

2020-02-10 Thread Khachatryan Roman
Hi,

Could you please provide a full stacktrace?

Regards,
Roman


On Mon, Feb 10, 2020 at 2:12 PM sunfulin  wrote:

> Hi, guys
> I am using Flink 1.10 and test functional cases with hive intergration.
> Hive with 1.1.0-cdh5.3.0 and with hadoop HA enabled.Running flink job I can
> see successful connection with hive metastore, but cannot read table data
> with exception:
>
> java.lang.IllegalArgumentException: java.net.UnknownHostException:
> nameservice1
> at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>
> I am running a standalone application. Looks like I am missing my hadoop
> conf file in my flink job application classpath. Where should I config ?
>
>
>
>


Re: Flink Elasticsearch upsert document in ES

2020-02-10 Thread ORIOL LOPEZ SANCHEZ
When building the request, you should build an UpdateRequest, like the 
following snippet:

import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.common.xcontent.XContentType

val doc: String = ???
val targetIndex: String = ???
val indexType: Option[String] = ???

new UpdateRequest()
  .index(targetIndex)
  .`type`(indexType.getOrElse("_doc"))
  .id(id)
  .upsert(doc, XContentType.JSON)
  .doc(doc, XContentType.JSON)
  .docAsUpsert(true)

I'm not entirely sure if you need both "doc" and "upsert" fields, as I think 
this depends on the Elasticsearch you're using.


De: ApoorvK 
Enviat el: dilluns, 10 de febrer de 2020 15:33
Per a: user@flink.apache.org 
Tema: Flink Elasticsearch upsert document in ES

Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.


I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .index(parameter.get("esIndexName"))
  .`type`(parameter.get("esIndexType"))
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)
.opType(OpType.CREATE)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener informaci?n privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilizaci?n, divulgaci?n y/o copia sin 
autorizaci?n puede estar prohibida en virtud de la legislaci?n vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma v?a y proceda a su destrucci?n.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinat?rio, pode 
conter informa??o privilegiada ou confidencial e ? para uso exclusivo da pessoa 
ou entidade de destino. Se n?o ? vossa senhoria o destinat?rio indicado, fica 
notificado de que a leitura, utiliza??o, divulga??o e/ou c?pia sem autoriza??o 
pode estar proibida em virtude da legisla??o vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destrui??o


Flink Elasticsearch upsert document in ES

2020-02-10 Thread ApoorvK
Team,
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost .

I am trying for a update or insert in case document not found or do not
insert if document is already there.


I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .index(parameter.get("esIndexName"))
  .`type`(parameter.get("esIndexType"))
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)
.opType(OpType.CREATE)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: SSL configuration - default behaviour

2020-02-10 Thread Robert Metzger
Hi,

thanks a lot for your message. By default, internal connections are not
encrypted.

On Fri, Feb 7, 2020 at 4:08 PM KristoffSC 
wrote:

> Hi,
> In documentation [1] we can read that
>
> All internal connections are SSL authenticated and encrypted. The
> connections use mutual authentication, meaning both server and client side
> of each connection need to present the certificate to each other. The
> certificate acts effectively as a shared secret.
>
> But is this a default behavior? Are internal connections encrypted by
> default?
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [DISCUSS] Drop connectors for Elasticsearch 2.x and 5.x

2020-02-10 Thread Flavio Pompermaier
+1 for dropping all Elasticsearch connectors < 6.x

On Mon, Feb 10, 2020 at 2:45 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> As described in this https://issues.apache.org/jira/browse/FLINK-11720
> ticket our elasticsearch 5.x connector does not work out of the box on
> some systems and requires a version bump. This also happens for our e2e.
> We cannot bump the version in es 5.x connector, because 5.x connector
> shares a common class with 2.x that uses an API that was replaced in 5.2.
>
> Both versions are already long eol: https://www.elastic.co/support/eol
>
> I suggest to drop both connectors 5.x and 2.x. If it is too much to drop
> both of them, I would strongly suggest dropping at least 2.x connector
> and update the 5.x line to a working es client module.
>
> What do you think? Should we drop both versions? Drop only the 2.x
> connector? Or keep them both?
>
> Best,
>
> Dawid
>
>


Re: [DISCUSS] Drop connectors for Elasticsearch 2.x and 5.x

2020-02-10 Thread Benchao Li
+1 for dropping 2.x - 5.x.

FYI currently only 6.x and 7.x ES Connectors are supported by table api.

Flavio Pompermaier  于2020年2月10日周一 下午10:03写道:

> +1 for dropping all Elasticsearch connectors < 6.x
>
> On Mon, Feb 10, 2020 at 2:45 PM Dawid Wysakowicz 
> wrote:
>
> > Hi all,
> >
> > As described in this https://issues.apache.org/jira/browse/FLINK-11720
> > ticket our elasticsearch 5.x connector does not work out of the box on
> > some systems and requires a version bump. This also happens for our e2e.
> > We cannot bump the version in es 5.x connector, because 5.x connector
> > shares a common class with 2.x that uses an API that was replaced in 5.2.
> >
> > Both versions are already long eol: https://www.elastic.co/support/eol
> >
> > I suggest to drop both connectors 5.x and 2.x. If it is too much to drop
> > both of them, I would strongly suggest dropping at least 2.x connector
> > and update the 5.x line to a working es client module.
> >
> > What do you think? Should we drop both versions? Drop only the 2.x
> > connector? Or keep them both?
> >
> > Best,
> >
> > Dawid
> >
> >
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: [DISCUSS] Drop connectors for Elasticsearch 2.x and 5.x

2020-02-10 Thread Itamar Syn-Hershko
+1 from dropping old versions because of jar hells etc. However, in the
wild there are still a lot of 2.x clusters and definitely 5.x clusters that
are having a hard time upgrading. We know because we assist those on a
daily basis.

It is very easy to create an HTTP based connector that works with all ES
versions, though. As Elasticsearch consultants and experts we have done
that many times before. For example see this simplified client that has
zero dependencies and can be easily brought in to Flink to use as a sink
for all ES versions:
https://github.com/BigDataBoutique/log4j2-elasticsearch-http/blob/master/src/main/java/com/bigdataboutique/logging/log4j2/ElasticsearchHttpClient.java

Will be happy to assist in such effort

On Mon, Feb 10, 2020 at 3:45 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> As described in this https://issues.apache.org/jira/browse/FLINK-11720
> ticket our elasticsearch 5.x connector does not work out of the box on
> some systems and requires a version bump. This also happens for our e2e.
> We cannot bump the version in es 5.x connector, because 5.x connector
> shares a common class with 2.x that uses an API that was replaced in 5.2.
>
> Both versions are already long eol: https://www.elastic.co/support/eol
>
> I suggest to drop both connectors 5.x and 2.x. If it is too much to drop
> both of them, I would strongly suggest dropping at least 2.x connector
> and update the 5.x line to a working es client module.
>
> What do you think? Should we drop both versions? Drop only the 2.x
> connector? Or keep them both?
>
> Best,
>
> Dawid
>
>
>

-- 

[image: logo] 
Itamar Syn-Hershko
CTO, Founder

ita...@bigdataboutique.com
https://bigdataboutique.com





Re: [DISCUSS] Drop connectors for Elasticsearch 2.x and 5.x

2020-02-10 Thread Robert Metzger
Thanks for starting this discussion!

+1 to drop both

On Mon, Feb 10, 2020 at 2:45 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> As described in this https://issues.apache.org/jira/browse/FLINK-11720
> ticket our elasticsearch 5.x connector does not work out of the box on
> some systems and requires a version bump. This also happens for our e2e.
> We cannot bump the version in es 5.x connector, because 5.x connector
> shares a common class with 2.x that uses an API that was replaced in 5.2.
>
> Both versions are already long eol: https://www.elastic.co/support/eol
>
> I suggest to drop both connectors 5.x and 2.x. If it is too much to drop
> both of them, I would strongly suggest dropping at least 2.x connector
> and update the 5.x line to a working es client module.
>
> What do you think? Should we drop both versions? Drop only the 2.x
> connector? Or keep them both?
>
> Best,
>
> Dawid
>
>
>


[DISCUSS] Drop connectors for Elasticsearch 2.x and 5.x

2020-02-10 Thread Dawid Wysakowicz
Hi all,

As described in this https://issues.apache.org/jira/browse/FLINK-11720
ticket our elasticsearch 5.x connector does not work out of the box on
some systems and requires a version bump. This also happens for our e2e.
We cannot bump the version in es 5.x connector, because 5.x connector
shares a common class with 2.x that uses an API that was replaced in 5.2.

Both versions are already long eol: https://www.elastic.co/support/eol

I suggest to drop both connectors 5.x and 2.x. If it is too much to drop
both of them, I would strongly suggest dropping at least 2.x connector
and update the 5.x line to a working es client module.

What do you think? Should we drop both versions? Drop only the 2.x
connector? Or keep them both?

Best,

Dawid




signature.asc
Description: OpenPGP digital signature


Flink connect hive with hadoop HA

2020-02-10 Thread sunfulin
Hi, guys
I am using Flink 1.10 and test functional cases with hive intergration. Hive 
with 1.1.0-cdh5.3.0 and with hadoop HA enabled.Running flink job I can see 
successful connection with hive metastore, but cannot read table data with 
exception:


java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
at 
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:668)
at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:604)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2598)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)


I am running a standalone application. Looks like I am missing my hadoop conf 
file in my flink job application classpath. Where should I config ?

Re: Flink Elasticsearch upsert document in ES

2020-02-10 Thread Apoorv Upadhyay
I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .index(parameter.get("esIndexName"))
  .`type`(parameter.get("esIndexType"))
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)
.opType(OpType.CREATE)


On Mon, Feb 10, 2020 at 1:42 PM Itamar Syn-Hershko <
ita...@bigdataboutique.com> wrote:

> Hi ApoorvK,
>
> Elasticsearch supports "create" mode while indexing. By default indexing
> will overwrite documents with a the same ID, but you can tell ES to refuse
> overwriting. See op_type in
> https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-query-params
> .
>
> Looking at the Elasticsearch Sink, it doesn't seem like it's implemented
> currently, but it should be relatively easy to add.
>
> On Mon, Feb 10, 2020 at 9:26 AM ApoorvK 
> wrote:
>
>> Team,
>> Presently I have added elasticsearch as a sink to a stream and inserting
>> the
>> json data, the problem is when I restore the application in case of crash
>> it
>> reprocess the data in between (meanwhile a backend application updates the
>> document in ES) and flink reinsert the document in ES and all update to ES
>> are lost .
>>
>> I am trying for a update or insert in case document not found or do not
>> insert if document is already there.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
>
> [image: logo] 
> Itamar Syn-Hershko
> CTO, Founder
>
> ita...@bigdataboutique.com
> https://bigdataboutique.com
> 
> 
> 
>


[VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-10 Thread jincheng sun
Hi everyone,

Please review and vote on the release candidate #1 for the PyFlink version
1.9.2, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:

* the official Apache binary convenience releases to be deployed to
dist.apache.org [1], which are signed with the key with fingerprint
8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source code [3].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Jincheng

[1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
[2] https://dist.apache.org/repos/dist/release/flink/KEYS
[3] https://github.com/apache/flink/tree/release-1.9.2


Re: [Help] Anyone know where I can find performance test result?

2020-02-10 Thread Khachatryan Roman
+ user@flink.apache.org (re-adding)

If you have a PR and would like to check the performance you can reach
Flink committers to see the results at http://codespeed.dak8s.net:8000/

This UI uses https://github.com/tobami/codespeed
So you can also set it up in your environment.

Regards,
Roman


On Mon, Feb 10, 2020 at 10:10 AM 闫旭  wrote:

> Yes, it’s the one
>
> On Feb 10, 2020, at 5:08 PM, Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
> Hi Xu Yan,
>
> Do you mean flink-benchmarks repo?
>
> Regards,
> Roman
>
>
> On Mon, Feb 10, 2020 at 4:18 AM 闫旭  wrote:
>
>> Hi there,
>>
>> I am just exploring the apache flink git repo and found the performance
>> test. I have already test on my local machine, I’m wondering if we got
>> online result?
>>
>> Thanks
>>
>> Regards
>>
>> Xu Yan
>>
>
>


Re: [Help] Anyone know where I can find performance test result?

2020-02-10 Thread Khachatryan Roman
Hi Xu Yan,

Do you mean flink-benchmarks repo?

Regards,
Roman


On Mon, Feb 10, 2020 at 4:18 AM 闫旭  wrote:

> Hi there,
>
> I am just exploring the apache flink git repo and found the performance
> test. I have already test on my local machine, I’m wondering if we got
> online result?
>
> Thanks
>
> Regards
>
> Xu Yan
>


Re: Flink HA for Job Cluster

2020-02-10 Thread KristoffSC
Thanks you both for answers.

So I just want to have this right.

I can I achieve HA for Job Cluster Docker config having the zookeeper quorum
configured like mentioned in [1] right (with s3 and zookeeper)?

I assume to modify default Job Cluster config to match the [1] setup.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink Elasticsearch upsert document in ES

2020-02-10 Thread Itamar Syn-Hershko
Hi ApoorvK,

Elasticsearch supports "create" mode while indexing. By default indexing
will overwrite documents with a the same ID, but you can tell ES to refuse
overwriting. See op_type in
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-query-params
.

Looking at the Elasticsearch Sink, it doesn't seem like it's implemented
currently, but it should be relatively easy to add.

On Mon, Feb 10, 2020 at 9:26 AM ApoorvK 
wrote:

> Team,
> Presently I have added elasticsearch as a sink to a stream and inserting
> the
> json data, the problem is when I restore the application in case of crash
> it
> reprocess the data in between (meanwhile a backend application updates the
> document in ES) and flink reinsert the document in ES and all update to ES
> are lost .
>
> I am trying for a update or insert in case document not found or do not
> insert if document is already there.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

[image: logo] 
Itamar Syn-Hershko
CTO, Founder

ita...@bigdataboutique.com
https://bigdataboutique.com





Flink Minimal requirements

2020-02-10 Thread KristoffSC
Hi all,
well this may be a little bit strange question, but are there any minimal
machine requirements (memory size, CPU etc) and  non functional requirements
(number of nodes, network ports ports, etc) for Flink?

I know it all boils down to what my deployed Job will be, but if we just
could put this aside for a moment and focus on a bare minimum just for
Flink.

Probably we can say that Flink requires minim 2 nodes right?
What about minimal memory needed for Flink runtime. How many threads Flink's
runtime is using.

Any thought about this one?

Thanks,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/