Re: Getting Beam(Python)-on-Flink-on-k8s to work

2020-08-27 Thread Sam Bourne
Hi Eugene!

I’m struggling to find complete documentation on how to do this. There
seems to be lots of conflicting or incomplete information: several ways to
deploy Flink, several ways to get Beam working with it, bizarre
StackOverflow questions, and no documentation explaining a complete working
example.

This *is* possible and I went through all the same frustrations of sparse
and confusing documentation. I’m glossing over a lot of details, but the
key thing was setting up the flink taskworker(s) to run docker. This
requires running docker-in-docker as the taskworker itself is a docker
container in k8s.

First create a custom flink container with docker:

# docker-flink Dockerfile

FROM flink:1.10
# install docker
RUN apt-get ...

Then setup the taskmanager deployment to use a sidecar docker-in-docker
service. This dind service is where the python sdk harness container
actually runs.

kind: Deployment
...
  containers:
  - name: docker
image: docker:19.03.5-dind
...
  - name: taskmanger
image: myregistry:5000/docker-flink:1.10
env:
- name: DOCKER_HOST
  value: tcp://localhost:2375
...

I quickly threw all these pieces together in a repo here:
https://github.com/sambvfx/beam-flink-k8s

I added a working (via minikube) step-by-step in the README to prove to
myself that I didn’t miss anything, but feel free to submit any PRs if you
want to add anything useful.

The documents you linked are very informative. It would be great to
aggregate all this into digestible documentation. Let me know if you have
any further questions!

Cheers,
Sam

On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov 
wrote:

> Hi Kyle,
>
> Thanks for the response!
>
> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver  wrote:
>
>> > - With the Flink operator, I was able to submit a Beam job, but hit the
>> issue that I need Docker installed on my Flink nodes. I haven't yet tried
>> changing the operator's yaml files to add Docker inside them.
>>
>> Running Beam workers via Docker on the Flink nodes is not recommended
>> (and probably not even possible), since the Flink nodes are themselves
>> already running inside Docker containers. Running workers as sidecars
>> avoids that problem. For example:
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>
>> The main problem with the sidecar approach is that I can't use the Flink
> cluster as a "service" for anybody to submit their jobs with custom
> containers - the container version is fixed.
> Do I understand it correctly?
> Seems like the Docker-in-Docker approach is viable, and is mentioned in
> the Beam Flink K8s design doc
> 
> .
>
>
>> > I also haven't tried this
>> 
>>  yet
>> because it implies submitting jobs using "kubectl apply"  which is weird -
>> why not just submit it through the Flink job server?
>>
>> I'm guessing it goes through k8s for monitoring purposes. I see no reason
>> it shouldn't be possible to submit to the job server directly through
>> Python, network permitting, though I haven't tried this.
>>
>>
>>
>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov 
>> wrote:
>>
>>> Hi folks,
>>>
>>> I'm still working with Pachama  right now; we
>>> have a Kubernetes Engine cluster on GCP and want to run Beam Python batch
>>> pipelines with custom containers against it.
>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
>>> support custom containers for batch pipelines yes so we're going with Flink.
>>>
>>> I'm struggling to find complete documentation on how to do this. There
>>> seems to be lots of conflicting or incomplete information: several ways to
>>> deploy Flink, several ways to get Beam working with it, bizarre
>>> StackOverflow questions, and no documentation explaining a complete working
>>> example.
>>>
>>> == My requests ==
>>> * Could people briefly share their working setup? Would be good to know
>>> which directions are promising.
>>> * It would be particularly helpful if someone could volunteer an hour of
>>> their time to talk to me about their working Beam/Flink/k8s setup. It's for
>>> a good cause (fixing the planet :) ) and on my side I volunteer to write up
>>> the findings to share with the community so others suffer less.
>>>
>>> == Appendix: My findings so far ==
>>> There are multiple ways to deploy Flink on k8s:
>>> - The GCP marketplace Flink operator
>>> 
>>>  (couldn't
>>> get it to work) and the respective CLI version
>>>  (buggy,
>>> but I got it working)
>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>> - Flink's native k

Handshake_failure running a Dataflow pipeline

2020-08-27 Thread bits horoscope
Hello, Apache Beam community!

Hope everything goes ok at this time.

I write to you asking for your guide and help. I have been facing some
problems accessing HTTPS resources from a pipeline deployed in Dataflow.
The problem occurs only when I run with DataflowRunner, the DirectRunner is
working ok.

The associated exception is:
handshake_failure. Stack
[sun.security.ssl.Alerts.getSSLException(Alerts.java:192),
sun.security.ssl.Alerts.getSSLException(Alerts.java:154),
sun.security.ssl.SSLSocketImpl.recvAlert(SSLSocketImpl.java:2033),
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1135),
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385),
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)

I found this post at Stackoverflow which describes my problem perfectly:
https://stackoverflow.com/questions/59128640/sslhandshakeexception-when-running-apache-beam-pipeline-in-dataflow,
but the proposed solution doesn't work for me. Maybe I'm missing something.

Basically I changed the Apache HttpClient to the Google HttpClient, but it
still isn't working. Here is the code I've tried.

*private* *void* outputProcess(ProcessContext pc, T img) *throws* Exception
{
StomachStreamerOptions opts = pc
.getPipelineOptions().as(StomachStreamerOptions.*class*);

NetHttpTransport.Builder builder = *new* NetHttpTransport.Builder();
builder.setSslSocketFactory(*createSslSocketFactory*());

JsonFactory JSON_FACTORY = *new* JacksonFactory();

String urlImage = img.getURLImage();
HttpRequestFactory requestFactory = builder.build().createRequestFactory(
*new* HttpRequestInitializer() {
@Override
*public* *void* initialize(HttpRequest request) {
request.setParser(*new* JsonObjectParser(JSON_FACTORY));
}
});

GenericUrl url = *new* GenericUrl(urlImage);

HttpRequest request = requestFactory.buildGetRequest(url);

HttpHeaders headers = *new* HttpHeaders();
headers.setUserAgent(opts.getUserAgentImages());
request.setHeaders(headers);

*try* {
HttpResponse res = request.execute();
*this*.outputProcess(pc, img, res);
} *catch* (Exception e) {
reportError(e);
}
}


   *private* *static* SSLSocketFactory createSslSocketFactory()
*throws* Exception
{

TrustManager[] byPassTrustManagers = *new*
TrustManager[]{*new* X509TrustManager()
{
*public* X509Certificate[] getAcceptedIssuers() {

*return* *new* X509Certificate[0];
}

*public* *void* checkClientTrusted(X509Certificate[] chain,
String authType) {

}

*public* *void* checkServerTrusted(X509Certificate[] chain,
String authType) {

}
}};
SSLContext sslContext = SSLContext.*getInstance*("TLSv1.2");
sslContext.init(*null*, byPassTrustManagers, *new* SecureRandom());
*return* sslContext.getSocketFactory();
}


Things I've tried:

- HttpClient Apache 4.5.1
- HttpClient Google 1.11
- Skip the certificate validation (permit all, just for testing. Not
working.)

I'm starting to think that maybe the problem is not on my code. Maybe is
beyond; the infrastructure, the CIPHER_SUITE is not allowed by Dataflow...
I don't know, just guess.

What I'm using:
- Java 8 (1.8.0_144)
- Apache Beam 2.9 (Jeje, I'm planning to upgrade it soon)

Thanks for reading my question. I will appreciate any idea that can put me
in the correct direction.

Andy


Re: How to integrate Beam SQL windowing query with KafkaIO?

2020-08-27 Thread Rui Wang
Glad it has worked!  So sounds like data has been dropped as they are
considered late data and `.withAllowedLateness()` make the data emitted.


-Rui

On Thu, Aug 27, 2020 at 10:09 AM Minreng Wu  wrote:

> Hi Rui,
>
> Thanks for your advice!
>
> After reading Chapter 2&3 of *Streaming Systems* and some other
> materials, eventually I make it work! It indeed turned out to be an issue
> of not setting the trigger correctly. Previously, I didn't set the trigger
> & watermark so it would use the default settings. After I added
> `.withAllowedLateness()`, it can correctly materialize the window output as
> expected. Thank you so much for your help!
>
> Thanks & Regards,
> Minreng
>
>
> On Mon, Aug 24, 2020 at 1:58 PM Rui Wang  wrote:
>
>> Hi,
>>
>> I checked the query in your SO question and I think the SQL usage is
>> correct.
>>
>> My current guess is that the problem is how does watermark generate and
>> advance in KafkaIO. It could be either the watermark didn't pass the end of
>> your SQL window for aggregation or the data was lagging behind the
>> watermark so they are considered late data.
>>
>> One way to verify it is you can try to use TestStream as the source to
>> evaluate your pipeline and see whether it works well.
>>
>> -Rui
>>
>> On Mon, Aug 24, 2020 at 11:06 AM Minreng Wu  wrote:
>>
>>> Hi contributors,
>>>
>>> Sorry to bother you! I met a problem when I was trying to apply a
>>> windowing aggregation Beam SQL query to a Kafka input source.
>>>
>>> The details of the question are in the following link:
>>> https://stackoverflow.com/questions/63566057/how-to-integrate-beam-sql-windowing-query-with-kafkaio.
>>> And the version of the Beam Java SDK I used is *2.23.0*
>>>
>>> Really appreciate your help and advice! Stay safe and happy!
>>>
>>> Thanks and regards,
>>> Minreng
>>>
>>


Re: Getting Beam(Python)-on-Flink-on-k8s to work

2020-08-27 Thread Eugene Kirpichov
Hi Kyle,

Thanks for the response!

On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver  wrote:

> > - With the Flink operator, I was able to submit a Beam job, but hit the
> issue that I need Docker installed on my Flink nodes. I haven't yet tried
> changing the operator's yaml files to add Docker inside them.
>
> Running Beam workers via Docker on the Flink nodes is not recommended (and
> probably not even possible), since the Flink nodes are themselves already
> running inside Docker containers. Running workers as sidecars avoids that
> problem. For example:
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>
> The main problem with the sidecar approach is that I can't use the Flink
cluster as a "service" for anybody to submit their jobs with custom
containers - the container version is fixed.
Do I understand it correctly?
Seems like the Docker-in-Docker approach is viable, and is mentioned
in the Beam
Flink K8s design doc

.


> > I also haven't tried this
> 
>  yet
> because it implies submitting jobs using "kubectl apply"  which is weird -
> why not just submit it through the Flink job server?
>
> I'm guessing it goes through k8s for monitoring purposes. I see no reason
> it shouldn't be possible to submit to the job server directly through
> Python, network permitting, though I haven't tried this.
>
>
>
> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov 
> wrote:
>
>> Hi folks,
>>
>> I'm still working with Pachama  right now; we have
>> a Kubernetes Engine cluster on GCP and want to run Beam Python batch
>> pipelines with custom containers against it.
>> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
>> support custom containers for batch pipelines yes so we're going with Flink.
>>
>> I'm struggling to find complete documentation on how to do this. There
>> seems to be lots of conflicting or incomplete information: several ways to
>> deploy Flink, several ways to get Beam working with it, bizarre
>> StackOverflow questions, and no documentation explaining a complete working
>> example.
>>
>> == My requests ==
>> * Could people briefly share their working setup? Would be good to know
>> which directions are promising.
>> * It would be particularly helpful if someone could volunteer an hour of
>> their time to talk to me about their working Beam/Flink/k8s setup. It's for
>> a good cause (fixing the planet :) ) and on my side I volunteer to write up
>> the findings to share with the community so others suffer less.
>>
>> == Appendix: My findings so far ==
>> There are multiple ways to deploy Flink on k8s:
>> - The GCP marketplace Flink operator
>> 
>>  (couldn't
>> get it to work) and the respective CLI version
>>  (buggy,
>> but I got it working)
>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>> - Flink's native k8s support
>> 
>>  (super
>> easy to get working)
>> 
>>
>> I confirmed that my Flink cluster was operational by running a simple
>> Wordcount job, initiated from my machine. However I wasn't yet able to get
>> Beam working:
>>
>> - With the Flink operator, I was able to submit a Beam job, but hit the
>> issue that I need Docker installed on my Flink nodes. I haven't yet tried
>> changing the operator's yaml files to add Docker inside them. I also
>> haven't tried this
>> 
>> yet because it implies submitting jobs using "kubectl apply"  which is
>> weird - why not just submit it through the Flink job server?
>>
>> - With Flink's native k8s support, I tried two things:
>>   - Creating a fat portable jar using  --output_executable_path. The jar
>> is huge (200+MB) and takes forever to upload to my Flink cluster - this is
>> a non-starter. But if I actually upload it, then I hit the same issue with
>> lacking Docker. Haven't tried fixing it yet.
>>   - Simply running my pipeline --runner=FlinkRunner
>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>
>> I looked at a few conference talks:
>> -
>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>> workers; and that I need to submit m

Is KafkaIO KafkaWriter stateless

2020-08-27 Thread Eleanore Jin
Hi all,

Just would like to confirm, KafkaWriter

has
no state, so that means, when enabled checkpoint, no state will be
checkpointed from KafkaWriter?

Thanks a lot!
Eleanore


Re: [Java - Beam Schema] Manually Generating a Beam Schema for a POJO class

2020-08-27 Thread Kaymak, Tobias
Hi Brian,

Thank you for opening the issue! My current workaround is to generate a
BigQuery schema with helper functions I already have (since I am writing to
BigQuery in the end in my sink). I have the Beam Schema function still in
the code, but I currently don't use them as I couldn't make them work in
time for a company internal demo. (So basically following your advice
trying to avoid the ProtoMessageSchema.)

Best,
Tobi

On Wed, Aug 19, 2020 at 10:52 PM Brian Hulette  wrote:

> It looks like this is occurring because we don't actually support mixing
> SchemaProviders in nested types. The current SchemaProvider implementations
> only support nested types for homogenous types (e.g. an AutoValue with an
> AutoValue field). So when you use JavaFieldSchema as the SchemaProvider for
> the outer type (EnrichedArticle), it is also used recursively for the inner
> type (ArticleEnvelope), rather than using the registered ProtoMessageSchema.
>
> I filed BEAM-10765 [1] to add support for inferring schemas for
> non-homogenous types, I think it's something we should be able to support.
> I know it's been a while since you reported this, have you found a
> workaround in the meantime? Your best bet may be to avoid using
> ProtoMessageSchema for the inner class for now and use the same style of
> class for the outer and inner class by just creating a POJO or AutoValue
> that replicates the ArticleEnvelope class.
>
>
> Luke: Regarding recursive schemas, Reuven and I have had some discussions
> about it offline. I think he said it should be feasible but I don't know
> much beyond that.
>
> Brian
>
> [1] https://issues.apache.org/jira/browse/BEAM-10765
>
> On Tue, Jun 30, 2020 at 2:10 AM Kaymak, Tobias 
> wrote:
>
>> I want to make my example as simple as possible while also not leaving
>> out the details that might be the reason for the error. I don't think there
>> is any recursiveness.
>> I can also share the ArticleEnvelope Protobuf file If that helps. I've
>> tried to register the ArticleEnvelope schema like this:
>>
>> TestPipeline p = TestPipeline.create();
>> TypeDescriptor
>> articleEnvelopeTypeDescriptor =
>> TypeDescriptor.of(ArticleProto.ArticleEnvelope.class);
>> Schema articleSchema =
>> new
>> ProtoMessageSchema().schemaFor(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>> SerializableFunction
>> articleEnvelopeToRow =
>> new
>> ProtoMessageSchema().toRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>> SerializableFunction
>> articleEnvelopeFromRow =
>> new
>> ProtoMessageSchema().fromRowFunction(TypeDescriptor.of(ArticleProto.ArticleEnvelope.class));
>>
>>
>> p.getSchemaRegistry().registerSchemaForClass(ArticleProto.ArticleEnvelope.class,
>> articleSchema,articleEnvelopeToRow,articleEnvelopeFromRow);
>>
>> The problem is that even when I define and register it like above, as
>> soon as I annotate the class EnrichedArticle with 
>> @DefaultSchema(JavaFieldSchema.class)
>> I get:
>>
>> Caused by: java.lang.IllegalAccessError: tried to access method
>> ch.ricardo.schemas.data_intelligence.ArticleProto$ArticleEnvelope.()V
>> from class
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO
>> at
>> ch.ricardo.schemas.data_intelligence.SchemaUserTypeCreator$SchemaCodeGen$gybLyTZO.create(Unknown
>> Source)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromValue(FromRowUsingCreator.java:110)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:87)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:62)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:120)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:92)
>> at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>> at
>> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.(MutationDetectors.java:115)
>> at
>> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>> at
>> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>> at
>> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:300)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>> at
>> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(Simpl

Re: How to integrate Beam SQL windowing query with KafkaIO?

2020-08-27 Thread Minreng Wu
Hi Rui,

Thanks for your advice!

After reading Chapter 2&3 of *Streaming Systems* and some other materials,
eventually I make it work! It indeed turned out to be an issue of not
setting the trigger correctly. Previously, I didn't set the trigger &
watermark so it would use the default settings. After I added
`.withAllowedLateness()`, it can correctly materialize the window output as
expected. Thank you so much for your help!

Thanks & Regards,
Minreng


On Mon, Aug 24, 2020 at 1:58 PM Rui Wang  wrote:

> Hi,
>
> I checked the query in your SO question and I think the SQL usage is
> correct.
>
> My current guess is that the problem is how does watermark generate and
> advance in KafkaIO. It could be either the watermark didn't pass the end of
> your SQL window for aggregation or the data was lagging behind the
> watermark so they are considered late data.
>
> One way to verify it is you can try to use TestStream as the source to
> evaluate your pipeline and see whether it works well.
>
> -Rui
>
> On Mon, Aug 24, 2020 at 11:06 AM Minreng Wu  wrote:
>
>> Hi contributors,
>>
>> Sorry to bother you! I met a problem when I was trying to apply a
>> windowing aggregation Beam SQL query to a Kafka input source.
>>
>> The details of the question are in the following link:
>> https://stackoverflow.com/questions/63566057/how-to-integrate-beam-sql-windowing-query-with-kafkaio.
>> And the version of the Beam Java SDK I used is *2.23.0*
>>
>> Really appreciate your help and advice! Stay safe and happy!
>>
>> Thanks and regards,
>> Minreng
>>
>