Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread David Morávek
Hi Sandeep,

Jan has already provided pretty good guidelines for getting more context on
the issue ;)

Because this is not for the first time, I would like to raise awareness,
that it's not OK to send a user related question to four Apache mailing
list (that I know of). Namely:

- u...@flink.apache.org
- d...@flink.apache.org
- user@beam.apache.org
- d...@beam.apache.org

Community focus is a very precious resource, that should be used wisely.
All of these mailings lists are answering many complex questions each day
and it's very unfortunate if any of this work needs to be duplicated. Next
time please focus Beam related user questions solely to user@beam.apache.org
.

Thanks for your understanding. You can consult community guidelines [1][2]
if you are not sure where the particular question belongs to.

[1] https://flink.apache.org/community.html#mailing-lists
[2] https://beam.apache.org/community/contact-us/

Best,
D.

On Tue, Sep 14, 2021 at 5:47 PM Jan Lukavský  wrote:

> Hi Sandeep,
> a few questions:
>  a) which state backend do you use for Flink?
>  b) what is your checkpointingInterval set for FlinkRunner?
>  c) how much data is there in your input Kafka topic(s)?
>
> FileIO has to buffer all elements per window (by default) into state, so
> this might create a high pressure on state backend and/or heap, which could
> result in suboptimal performance. Due to the "connection loss" and timeout
> exceptions you describe I'd suppose there might be a lot of GC pressure.
>
>  Jan
> On 9/14/21 5:20 PM, Kathula, Sandeep wrote:
>
> Hi,
>
>We have a simple Beam application which reads from Kafka, converts to
> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
> have a fixed window of 5 minutes after conversion to
> PCollection and then writing to S3. We have around 320
> columns in our data. Our intention is to write large files of size 128MB or
> more so that it won’t have a small file problem when reading back from
> Hive. But from what we observed it is taking too much memory to write to S3
> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
> going OOM). When I increase memory for heap to 32GB then it take lot of
> time to write records to s3.
>
> For instance it takes:
>
>
>
> 20 MB file - 30 sec
>
> 50 MB file - 1 min 16 sec
>
> 75 MB file - 2 min 15 sec
>
> 83 MB file - 2 min 40 sec
>
>
>
> Code block to write to S3:
>
> PCollection parquetRecord = …….
>
>
>
> parquetRecord.apply(FileIO.*write*()
> .via(ParquetIO.*sink*(getOutput_schema()))
> .to(outputPath.isEmpty() ? outputPath() : outputPath)
> .withNumShards(5)
> .withNaming(new CustomFileNaming("snappy.parquet")));
>
>
>
>
>
> We are also getting different exceptions like:
>
>
>
>1. *UserCodeException*:
>
>
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>
> at
> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>
> at
> 

Re: [DISCUSS] Drop support for Flink 1.10

2021-05-31 Thread David Morávek
Hi,

+1 as we've agreed to keep support for three latest major releases in the
past

D.

On Mon, May 31, 2021 at 9:54 AM Jan Lukavský  wrote:

> Hi,
>
> +1 to remove the support for 1.10.
>
>  Jan
> On 5/28/21 10:00 PM, Ismaël Mejía wrote:
>
> Hello,
>
> With Beam support for Flink 1.13 just merged it is the time to discuss the
> end of
> support for Flink 1.10 following the agreed policy on supporting only the
> latest
> three Flink releases [1].
>
> I would like to propose that for Beam 2.31.0 we stop supporting Flink 1.10
> [2].
> I prepared a PR for this [3] but of course I wanted to bring the subject
> here
> (and to user@) for your attention and in case someone has a different
> opinion or
> reason to still support the older version.
>
> WDYT?
>
> Regards,
> Ismael
>
> [1]
> https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E
> [2] https://issues.apache.org/jira/browse/BEAM-12281
> [3] https://github.com/apache/beam/pull/14906
>
>


Re: [DISCUSS] Drop support for Flink 1.8 and 1.9

2021-03-12 Thread David Morávek
+1

D.

On Thu, Mar 11, 2021 at 8:33 PM Ismaël Mejía  wrote:

> +user
>
> > Should we add a warning or something to 2.29.0?
>
> Sounds like a good idea.
>
>
>
>
> On Thu, Mar 11, 2021 at 7:24 PM Kenneth Knowles  wrote:
> >
> > Should we add a warning or something to 2.29.0?
> >
> > On Thu, Mar 11, 2021 at 10:19 AM Ismaël Mejía  wrote:
> >>
> >> Hello,
> >>
> >> We have been supporting older versions of Flink that we had agreed in
> previous
> >> discussions where we said we will be supporting only the latest three
> releases
> >> [1].
> >>
> >> I would like to propose that for Beam 2.30.0 we stop supporting Flink
> 1.8 and
> >> 1.9 [2].  I prepared a PR for this [3] but of course I wanted to bring
> the
> >> subject here (and to user@) for your attention and in case someone has
> a
> >> different opinion or reason to still support the older versions.
> >>
> >> WDYT?
> >>
> >> Regards,
> >> Ismael
> >>
> >> [1]
> https://lists.apache.org/thread.html/rfb5ac9d889d0e3f4400471de3c25000a15352bde879622c899d97581%40%3Cdev.beam.apache.org%3E
> >> [2] https://issues.apache.org/jira/browse/BEAM-11948
> >> [3] https://github.com/apache/beam/pull/14203
>


Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-16 Thread David Morávek
Hello Sandeep,

Are you seeing any skew in your data (affected TMs are receiving more data
than others)? How many partitions does your source topic have (this could
explain why some TMs would have more work to perform)?

Also, would it be possible to retry your test with the latest SDK?

D.

On Sun, Aug 16, 2020 at 6:44 AM Eleanore Jin  wrote:

> Hi Sandeep,
>
> As I am also exploring the Beam KafkaIO, just to share some of my
> thoughts.
> 1. My understanding is, in order to guarantee no message loss, you will
> need to use KafkaExactlyOnceSink [1]. And it is not possible to relax to
> at-least-once with current KafkaIO.
> 2. when KafkaExactlyOnceSink is enabled, the checkpoint will include:
> offset from source, all the messages in between the last checkpoint and
> current checkpoint.
> 3. Only after the checkpoint is completed, then KakfaExactlyOnceSink will
> start publishing messages that have been checkpointed.
> 4. In case of failure during publishing these messages, the messages will
> be retried, there will be sequenceId assigned to each message, to determine
> which messages are published successfully, which one need to be tried. e.g.
> say messages 5 - 10 are in checkpoint, only 5 and 6 are published
> successfully, then when restart from checkpoint, only 7 to 10 will be
> published again.
>
> My question for your setup:
> if you just enable checkpoint and still use KafkaWriter [2], and your
> application is stateless, then the only state is source offset.
> consider below scenario:
> checkpoint offset 10, and checkpoint is succeeded, then message with
> offset 10 fails to be published, job restarted, it will resume from
> checkpoint, and start from offset 11, then message 10 gets lost.
>
> Please correct me if I am missing anything.
>
> Thanks a lot!
> Eleanore
>
> [1]
> https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
> [2]
> https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
>
> On Tue, Aug 11, 2020 at 11:19 PM Eleanore Jin 
> wrote:
>
>>
>> Hi Sandeep,
>>
>> Thanks a lot for the information! I am on a similar track which requires
>> to scale up/down the stateless pipeline from a savepoint.
>>
>> It’s good to learn from your experience.
>>
>> Thanks!
>> Eleanore
>>
>> On Tue, Aug 11, 2020 at 10:21 AM Kathula, Sandeep <
>> sandeep_kath...@intuit.com> wrote:
>>
>>> Hi Eleanore,
>>>
>>> We are using atleast once semantics when writing to Kafka. We are Ok
>>> with duplicate messages.
>>>
>>> Thanks
>>>
>>> Sandeep Kathula
>>>
>>>
>>>
>>> *From: *Eleanore Jin 
>>> *Date: *Monday, August 10, 2020 at 11:32 AM
>>> *To: *"Kathula, Sandeep" 
>>> *Cc: *"user@beam.apache.org" , "Vora, Jainik" <
>>> jainik_v...@intuit.com>, "Benenson, Mikhail" <
>>> mikhail_benen...@intuit.com>, "Deshpande, Omkar" <
>>> omkar_deshpa...@intuit.com>, "LeVeck, Matt" 
>>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>>> after downscaling
>>>
>>>
>>>
>>> This email is from an external sender.
>>>
>>>
>>>
>>> Hi Sandeep,
>>>
>>>
>>>
>>> Thanks a lot for sharing! On a separate note, I see you are using the
>>> KafkaIO.write, but not with EOS (exactly once semantics). From my
>>> understanding, just enabling a checkpoint will not be enough to guarantee
>>> no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
>>> also read and write to Kafka with KafkaIO.
>>>
>>>
>>>
>>> Thanks a lot!
>>>
>>> Eleanore
>>>
>>>
>>>
>>> On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
>>> sandeep_kath...@intuit.com> wrote:
>>>
>>> Hi Eleanore,
>>>
>>> We are also observing that few task managers are able to
>>> keep up with incoming load but few task managers are lagging behind after
>>> starting from savepoint with less parallelism. Not all task managers are
>>> affected by this problem. We repeated this test multiple times to confirm.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Sandeep Kathula
>>>
>>>
>>>
>>> *From: *"Kathula, Sandeep" 
>>> *Reply-To: *"user@beam.apache.org" 
>>> *Date: *Monday, August 10, 2020 at 11:04 AM
>>> *To: *"user@beam.apache.org" , "
>>> eleanore@gmail.com" 
>>> *Cc: *"Vora, Jainik" , "Benenson, Mikhail" <
>>> mikhail_benen...@intuit.com>, "Deshpande, Omkar" <
>>> omkar_deshpa...@intuit.com>, "LeVeck, Matt" 
>>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>>> after downscaling
>>>
>>>
>>>
>>> This email is from an external sender.
>>>
>>>
>>>
>>> Hi Eleanore,
>>>
>>> Our DAG:
>>>
>>> Source: Strip Metadata/EventBusIO.Read/Read Bytes From
>>> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
>>> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
>>> Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip
>>> Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
>>> 

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

2020-04-21 Thread David Morávek
Hi Stephen,

nice catch and awesome report! ;) This definitely needs a proper fix. I've
created a new JIRA to track the issue and will try to resolve it soon as
this seems critical to me.

https://issues.apache.org/jira/browse/BEAM-9794

Thanks,
D.

On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel 
wrote:

> I was able to reproduce this in a unit test:
>
> @Test
>>
>>   *public* *void* test() *throws* InterruptedException,
>> ExecutionException {
>>
>> FlinkPipelineOptions options = PipelineOptionsFactory.*as*
>> (FlinkPipelineOptions.*class*);
>>
>> options.setCheckpointingInterval(10L);
>>
>> options.setParallelism(1);
>>
>> options.setStreaming(*true*);
>>
>> options.setRunner(FlinkRunner.*class*);
>>
>> options.setFlinkMaster("[local]");
>>
>> options.setStateBackend(*new* MemoryStateBackend(Integer.*MAX_VALUE*
>> ));
>>
>> Pipeline pipeline = Pipeline.*create*(options);
>>
>> pipeline
>>
>> .apply(Create.*of*((Void) *null*))
>>
>> .apply(
>>
>> ParDo.*of*(
>>
>> *new* DoFn() {
>>
>>
>>   *private* *static* *final* *long* *serialVersionUID* =
>> 1L;
>>
>>
>>   @RequiresStableInput
>>
>>   @ProcessElement
>>
>>   *public* *void* processElement() {}
>>
>> }));
>>
>> pipeline.run();
>>
>>   }
>>
>
> It took a while to get to checkpoint 32,767, but eventually it did, and it
> failed with the same error I listed above.
>
> On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel 
> wrote:
>
>> I have a Beam Pipeline (2.14) running on Flink (1.8.0, emr-5.26.0) that
>> uses the RequiresStableInput feature.
>>
>> Currently it's configured to checkpoint once a minute, and after around
>> 32000-33000 checkpoints, it fails with:
>>
>>> 2020-04-15 13:15:02,920 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:15:05,762 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
>>> in 2667 ms).
>>> 2020-04-15 13:16:02,919 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:16:03,147 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph-
>>>  (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
>>> RUNNING to FAILED.
>>> AsynchronousException{java.lang.Exception: Could not materialize
>>> checkpoint 32702 for operator  (1/2).}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702
>>> for operator  (1/2).
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>>> ... 6 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.IllegalArgumentException
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>>> at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>>> ... 5 more
>>> Caused by: java.lang.IllegalArgumentException
>>> at
>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.(OperatorBackendSerializationProxy.java:68)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at 

Re: [Python] Read Hadoop Sequence File?

2019-07-02 Thread David Morávek
Hi, you can use SequenceFileSink and Source, from a BigTable client. Those
works nice with FileIO.

https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java
https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSource.java

It would be really cool to move these into Beam, but that's up to Googlers
to decide, whether they want to donate this.

D.

On Tue, Jul 2, 2019 at 2:07 AM Shannon Duncan 
wrote:

> It's not outside the realm of possibilities. For now I've created an
> intermediary step of a hadoop job that converts from sequence to text file.
>
> Looking into better options.
>
> On Mon, Jul 1, 2019, 5:50 PM Chamikara Jayalath 
> wrote:
>
>> Java SDK has a HadoopInputFormatIO using which you should be able to read
>> Sequence files:
>> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
>> I don't think there's a direct alternative for this for Python.
>>
>> Is it possible to write to a well-known format such as Avro instead of a
>> Hadoop specific format which will allow you to read from both
>> Dataproc/Hadoop and Beam Python SDK ?
>>
>> Thanks,
>> Cham
>>
>> On Mon, Jul 1, 2019 at 3:37 PM Shannon Duncan 
>> wrote:
>>
>>> That's a pretty big hole for a missing source/sink when looking at
>>> transitioning from Dataproc to Dataflow using GCS as storage buffer instead
>>> of a traditional hdfs.
>>>
>>> From what I've been able to tell from source code and documentation,
>>> Java is able to but not Python?
>>>
>>> Thanks,
>>> Shannon
>>>
>>> On Mon, Jul 1, 2019 at 5:29 PM Chamikara Jayalath 
>>> wrote:
>>>
 I don't think we have a source/sink for reading Hadoop sequence files.
 Your best bet currently will probably be to use FileSystem abstraction to
 create a file from a ParDo and read directly from there using a library
 that can read sequence files.

 Thanks,
 Cham

 On Mon, Jul 1, 2019 at 8:42 AM Shannon Duncan <
 joseph.dun...@liveramp.com> wrote:

> I'm wanting to read a Sequence/Map file from Hadoop stored on Google
> Cloud Storage via a " gs://bucket/link/SequenceFile-* " via the Python 
> SDK.
>
> I cannot locate any good adapters for this, and the one Hadoop
> Filesystem reader seems to only read from a "hdfs://" url.
>
> I'm wanting to use Dataflow and GCS exclusively to start mixing in
> Beam pipelines with our current Hadoop Pipelines.
>
> Is this a feature that is supported or will be supported in the future?
> Does anyone have any good suggestions for this that is performant?
>
> I'd also like to be able to write back out to a SequenceFile if
> possible.
>
> Thanks!
>
>


Re: [ANNOUNCE] Spark portable runner (batch) now available for Java, Python, Go

2019-06-19 Thread David Morávek
Great job Kyle, thanks for pushing this forward!

Sent from my iPhone

> On 18 Jun 2019, at 12:58, Ismaël Mejía  wrote:
> 
> I have been thrilled from seeing from the first row this happening.
> 
> Thanks a lot Kyle. Excellent work!
> 
> 
> On Mon, Jun 17, 2019 at 9:15 PM Ankur Goenka  wrote:
> >
> > Thanks Kyle!
> > This is a great addition towards supporting portability on Beam.
> >
> > On Mon, Jun 17, 2019 at 9:21 AM Ahmet Altay  wrote:
> >>
> >> Thank you Kyle! This is great news :)
> >>
> >> On Mon, Jun 17, 2019 at 6:40 AM Andres Angel 
> >>  wrote:
> >>>
> >>> Really great achievement!!! congrats.
> >>>
> >>> On Mon, Jun 17, 2019 at 7:49 AM Robert Bradshaw  
> >>> wrote:
> 
>  Excellent work, very excited to see this!
> 
>  On Fri, Jun 14, 2019 at 11:02 PM Kyle Weaver  wrote:
> >
> > Hello Beamers,
> >
> > I'm happy to announce that the portable Spark runner is now mostly 
> > feature-complete [0] for BATCH processing (STREAMING is not yet 
> > available). This means you can run your new or existing Beam Python and 
> > Go pipelines using Apache Spark as the underlying execution engine.
> >
> > "Portable," you ask? Essentially, it shares a lot of the same code as 
> > the existing Spark runner, but also leverages Beam's portability APIs 
> > [1] to add Python and Go support, in addition to Java (note that the Go 
> > SDK itself is still considered experimental [2]).
> >
> > Instructions on how to run pipelines on the portable Spark runner are 
> > available on the Beam website [3].
> >
> > While we are passing Beam's fairly comprehensive test suites [4][5][6], 
> > the portable Spark runner has yet to be tested in a production 
> > environment, so please feel free to file a Jira and tag me if you have 
> > issues or feature requests (username: ibzib).
> >
> > Thanks,
> > Kyle
> >
> > [0] https://s.apache.org/apache-beam-portability-support-table
> > [1] https://beam.apache.org/roadmap/portability/
> > [2] 
> > https://lists.apache.org/thread.html/8f729da2d3009059d7a8b2d8624446be161700dcfa953939dd3530c6@%3Cdev.beam.apache.org%3E
> > [3] https://beam.apache.org/documentation/runners/spark/
> > [4] https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch
> > [5] https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/
> > [6] https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/
> >
> > Kyle Weaver | Software Engineer | github.com/ibzib | 
> > kcwea...@google.com | +1650203


Re: Is there a way to decide what RDDs get cached in the Spark Runner?

2019-05-16 Thread David Morávek
Hello Augusto,

This is a long standing problem that is really hard to fix properly, unless
the runner has better understanding of what is actually happening in the
pipeline itself (that's what Jan already pointed out). I still think the
best approach, that would require least "knobs" for user to tune, would be
sampling of previous pipeline runs and reusing the gathered information to
decide what should be cached and how.

I think we can probably eliminate some of the major cases that could cause
trouble, eg. when DAG split is right after shuffle, it's usually not worth
caching as Spark can reuse shuffle files, unless computation is really
expensive (random access db joins) ...

D.

On Thu, May 16, 2019 at 9:38 AM Augusto Ribeiro 
wrote:

> Hi Robert,
>
> If Spark could decide then it would be ok, but probably not a trivial
> problem like Jan pointed out.
>
> In my case, the decision had to be made on the basis of the machines in
> the cluster, in principle if you had enough machines (or larger) to hold
> everything in memory it wouldn't be a problem but in reality we all have
> constrains on how many and what kind of machines to use.
>
> Best regards,
> Augusto
>
> > On 15 May 2019, at 10:39, Robert Bradshaw  wrote:
> >
> > Just to clarify, do you need direct control over what to cache, or
> > would it be OK to let Spark decide the minimal set of RDDs to cache as
> > long as we didn't cache all intermediates?
> >
> > From: Augusto Ribeiro 
> > Date: Wed, May 15, 2019 at 8:37 AM
> > To: 
> >
> >> Hi Kyle,
> >>
> >> Thanks for the help. It seems like I have no other choice than using
> Spark directly, since my job causes immense memory pressure if I can't
> decide what to cache.
> >>
> >> Best regards,
> >> Augusto
> >>
> >> On 14 May 2019, at 18:40, Kyle Weaver  wrote:
> >>
> >> Minor correction: Slack channel is actually #beam-spark
> >>
> >> Kyle Weaver | Software Engineer | github.com/ibzib |
> kcwea...@google.com | +1650203
> >>
> >>
> >> From: Kyle Weaver 
> >> Date: Tue, May 14, 2019 at 9:38 AM
> >> To: 
> >>
> >>> Hi Augusto,
> >>>
> >>> Right now the default behavior is to cache all intermediate RDDs that
> are consumed more than once by the pipeline. This can be disabled with
> `options.setCacheDisabled(true)` [1], but there is currently no way for the
> user to specify to the runner that it should cache certain RDDs, but not
> others.
> >>>
> >>> There has recently been some discussion on the Slack (#spark-beam)
> about implementing such a feature, but no concrete plans as of yet.
> >>>
> >>> [1]
> https://github.com/apache/beam/blob/81faf35c8a42493317eba9fa1e7b06fb42d54662/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java#L150
> >>>
> >>> Thanks
> >>>
> >>> Kyle Weaver | Software Engineer | github.com/ibzib |
> kcwea...@google.com | +1650203
> >>>
> >>>
> >>> From: augusto@gmail.com 
> >>> Date: Tue, May 14, 2019 at 5:01 AM
> >>> To: 
> >>>
>  Hi,
> 
>  I guess the title says it all, right now it seems like BEAM caches
> all the intermediate RDD results for my pipeline when using the Spark
> runner, this leads to a very inefficient usage of memory. Any way to
> control this?
> 
>  Best regards,
>  Augusto
> >>
> >>
>
>


Re: kafka 0.9 support

2019-04-03 Thread David Morávek
I'd say that APIs we use in KafkaIO are pretty much stable since 0.10
release, all reflection based compatibility adapters seem to be aimed for
0.9 release (which is 8 major releases behind current Kafka release).

We may take an inspiration from Flink's kafka connector
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html>,
they maintain separate maven artifact for all supported Kafka APIs. This
may be the best approach as we can still share most of the codebase between
versions, have compile time checks and also run tests against all of the
supported versions.

I'm not really comfortable with reflection based adapters
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java>
as they seem fragile and don't provide compile time checks.

On Tue, Apr 2, 2019 at 11:27 PM Austin Bennett 
wrote:

> I withdraw my concern -- checked on info on the cluster I will eventually
> access.  It is on 0.8, so I was speaking too soon.  Can't speak to rest of
> user base.
>
> On Tue, Apr 2, 2019 at 11:03 AM Raghu Angadi  wrote:
>
>> Thanks to David Morávek for pointing out possible improvement to KafkaIO
>> for dropping support for 0.9 since it avoids having a second consumer just
>> to fetch latest offsets for backlog.
>>
>> Ideally we should be dropping 0.9 support for next major release, in fact
>> better to drop versions before 0.10.1 at the same time. This would further
>> reduce reflection based calls for supporting multiple versions. If the
>> users still on 0.9 could stay on current stable release of Beam, dropping
>> would not affect them. Otherwise, it would be good to hear from them about
>> how long we need to keep support for old versions.
>>
>> I don't think it is good idea to have multiple forks of KafkaIO in the
>> same repo. If we do go that route, we should fork the entire kafka
>> directory and rename the main class KafkaIO_Unmaintained :).
>>
>> IMHO, so far, additional complexity for supporting these versions is not
>> that bad. Most of it is isolated to ConsumerSpEL.java & ProducerSpEL.java.
>> My first preference is dropping support for deprecated versions (and a
>> deprecate a few more versions, may be till the version that added
>> transactions around 0.11.x I think).
>>
>> I haven't looked into what's new in Kafka 2.x. Are there any features
>> that KafkaIO should take advantage of? I have not noticed our existing code
>> breaking. We should certainly certainly support latest releases of Kafka.
>>
>> Raghu.
>>
>> On Tue, Apr 2, 2019 at 10:27 AM Mingmin Xu  wrote:
>>
>>>
>>> We're still using Kafka 0.10 a lot, similar as 0.9 IMO. To expand
>>> multiple versions in KafkaIO is quite complex now, and it confuses users
>>> which is supported / which is not. I would prefer to support Kafka 2.0+
>>> only in the latest version. For old versions, there're some options:
>>> 1). document Kafka-Beam support versions, like what we do in FlinkRunner;
>>> 2). maintain separated KafkaIOs for old versions;
>>>
>>> 1) would be easy to maintain, and I assume there should be no issue to
>>> use Beam-Core 3.0 together with KafkaIO 2.0.
>>>
>>> Any thoughts?
>>>
>>> Mingmin
>>>
>>> On Tue, Apr 2, 2019 at 9:56 AM Reuven Lax  wrote:
>>>
>>>> KafkaIO is marked as Experimental, and the comment already warns that
>>>> 0.9 support might be removed. I think that if users still rely on Kafka 0.9
>>>> we should leave a fork (renamed) of the IO in the tree for 0.9, but we can
>>>> definitely remove 0.9 support from the main IO if we want, especially if
>>>> it's complicated changes to that IO. If we do though, we should fail with a
>>>> clear error message telling users to use the Kafka 0.9 IO.
>>>>
>>>> On Tue, Apr 2, 2019 at 9:34 AM Alexey Romanenko <
>>>> aromanenko@gmail.com> wrote:
>>>>
>>>>> > How are multiple versions of Kafka supported? Are they all in one
>>>>> client, or is there a case for forks like ElasticSearchIO?
>>>>>
>>>>> They are supported in one client but we have additional “ConsumerSpEL”
>>>>> adapter which unifies interface difference among different Kafka client
>>>>> versions (mostly to support old ones 0.9-0.10.0).
>>>>>
>>>>> On the other hand, we warn user in Javadoc of KafkaIO (which is
>>>>> Unstable, btw) by the following:
>>>>> *“KafkaIO relies on kafka-clients for a

Re: joda-time dependency version

2019-03-23 Thread David Morávek
If there are no objections from dev@, I'll try to proceed with an upgrade
to the latest version <https://jira.apache.org/jira/browse/BEAM-6895>
(2.10.1).

Kenn, I've found your issue <https://jira.apache.org/jira/browse/BEAM-5827>
for joda-time vendoring, is it still relevant? This might cause a breaking
change as it is part of user facing API.

D.

On Thu, Mar 21, 2019 at 5:44 PM Kenneth Knowles  wrote:

> +dev@
>
> I don't know of any special reason we are using an old version.
>
> Kenn
>
> On Thu, Mar 21, 2019, 09:38 Ismaël Mejía  wrote:
>
>> Does anyone have any context on why we have such an old version of
>> Joda time (2.4 released on  2014!) and if there is any possible issue
>> upgrading it? If not maybe we can try to upgrade it..
>>
>> On Thu, Mar 21, 2019 at 5:35 PM Ismaël Mejía  wrote:
>> >
>> > Mmmm interesting issue. There is also a plan to use a vendored version
>> > of joda-time not sure on the progress on that one.
>> > https://issues.apache.org/jira/browse/BEAM-5827
>> >
>> > For Beam 3 that's the idea but  so far there is not at ETA for Beam 3.
>> > https://issues.apache.org/jira/browse/BEAM-5530
>> >
>> > On Thu, Mar 21, 2019 at 4:15 PM rahul patwari
>> >  wrote:
>> > >
>> > > Hi David,
>> > >
>> > > The only incompatibility we have come across is this:
>> > > We have some timestamp format conversions in our project, where we
>> are converting from a timestamp format to another.
>> > >
>> > > With joda-time 2.4:
>> > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss"
>> format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12
>> 19-Mar-15 -07:00".
>> > >
>> > > Whereas with joda-time 2.9.3:
>> > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss"
>> format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12
>> 19-Mar-15 PDT".
>> > >
>> > > The javadoc for both the versions doesn't seem different though, for
>> 'z' DateTimeFormat.
>> > >
>> > > Even though the javadoc says - Zone names: Time zone names ('z')
>> cannot be parsed for both the versions, we are able to parse it in
>> joda-time 2.9.3.
>> > >
>> > > Also, joda-time will be replaced with java time with Beam 3?
>> > >
>> > > Thanks,
>> > > Rahul
>> > >
>> > > On Thu, Mar 21, 2019 at 5:37 PM David Morávek <
>> david.mora...@gmail.com> wrote:
>> > >>
>> > >> Hello Rahul, are there any incompatibilities you are running into
>> with spark version? These versions should be backward compatible.
>> > >>
>> > >> For jodatime doc:
>> > >> The main public API will remain backwards compatible for both source
>> and binary in the 2.x stream.
>> > >>
>> > >> This means you should be able to safely use Spark's version.
>> > >>
>> > >> D.
>> > >>
>> > >> On Thu, Mar 21, 2019 at 5:45 AM rahul patwari <
>> rahulpatwari8...@gmail.com> wrote:
>> > >>>
>> > >>> Hi Ismael,
>> > >>>
>> > >>> We are using Beam with Spark Runner and Spark 2.4 has joda-time
>> 2.9.3 as a dependency. So, we have used joda-time 2.9.3 in our shaded
>> artifact set. As Beam has joda-time 2.4 as a dependency, I was wondering
>> whether it would break anything in Beam.
>> > >>>
>> > >>> Will joda-time be replaced with java time in Beam 3? What is the
>> expected release date of Beam 3?
>> > >>>
>> > >>> Thanks,
>> > >>> Rahul
>> > >>>
>> > >>> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía 
>> wrote:
>> > >>>>
>> > >>>> Hello,
>> > >>>>
>> > >>>> The long term goal would be to get rid of joda-time but that won't
>> > >>>> happen until Beam 3.
>> > >>>> Any 'particular' reason or motivation to push the upgrade?
>> > >>>>
>> > >>>> Regards,
>> > >>>> Ismaël
>> > >>>>
>> > >>>> On Wed, Mar 20, 2019 at 11:53 AM rahul patwari
>> > >>>>  wrote:
>> > >>>> >
>> > >>>> > Hi,
>> > >>>> >
>> > >>>> > Is there a plan to upgrade the dependency version of joda-time
>> to 2.9.3 or latest version?
>> > >>>> >
>> > >>>> >
>> > >>>> > Thanks,
>> > >>>> > Rahul
>>
>


Re: joda-time dependency version

2019-03-21 Thread David Morávek
Hello Rahul, are there any incompatibilities you are running into with
spark version? These versions should be backward compatible.

For jodatime doc:
The main public API will remain *backwards compatible* for both source and
binary in the 2.x stream.

This means you should be able to safely use Spark's version.

D.

On Thu, Mar 21, 2019 at 5:45 AM rahul patwari 
wrote:

> Hi Ismael,
>
> We are using Beam with Spark Runner and Spark 2.4 has joda-time 2.9.3 as a
> dependency. So, we have used joda-time 2.9.3 in our shaded artifact set. As
> Beam has joda-time 2.4 as a dependency, I was wondering whether it would
> break anything in Beam.
>
> Will joda-time be replaced with java time in Beam 3? What is the expected
> release date of Beam 3?
>
> Thanks,
> Rahul
>
> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía  wrote:
>
>> Hello,
>>
>> The long term goal would be to get rid of joda-time but that won't
>> happen until Beam 3.
>> Any 'particular' reason or motivation to push the upgrade?
>>
>> Regards,
>> Ismaël
>>
>> On Wed, Mar 20, 2019 at 11:53 AM rahul patwari
>>  wrote:
>> >
>> > Hi,
>> >
>> > Is there a plan to upgrade the dependency version of joda-time to 2.9.3
>> or latest version?
>> >
>> >
>> > Thanks,
>> > Rahul
>>
>


Re: Moving to spark 2.4

2018-12-07 Thread David Morávek
+1 for waiting for HDP and CDH adoption

Sent from my iPhone

> On 7 Dec 2018, at 16:38, Alexey Romanenko  wrote:
> 
> I agree with Ismael and I’d wait until the new Spark version will be 
> supported by major BigData distributors.
> 
>> On 7 Dec 2018, at 14:57, Vishwas Bm  wrote:
>> 
>> Hi Ismael,
>> 
>> We have upgraded the spark to 2.4.  
>> In our setup we had run few basic tests and found it to be pretty stable.
>> 
>> 
>> Thanks & Regards,
>> Vishwas 
>> 
>> 
>>> On Fri, Dec 7, 2018 at 2:53 PM Ismaël Mejía  wrote:
>>> Hello Vishwas,
>>> 
>>> The spark dependency in the spark runner is provided so you can
>>> already pass the dependencies of spark 2.4 and it should work out of
>>> the box.
>>> 
>>> JB did a PR to upgrade the version of Spark in the runner, but maybe
>>> it is worth to wait a bit before merging it, at least until some of
>>> the Big Data distributions has spark 2.4.x support available, so far
>>> nobody has upgraded it (well apart of databricks).
>>> 
>>> What do others think, should we move ahead or are you aware of any
>>> issue introduced by version 2.4.0? (Notice that the PR just updates
>>> the version so code compatibility should be ok).
>>> 
>>> Ismaël
>>> 
>>> On Thu, Dec 6, 2018 at 12:14 PM Jean-Baptiste Onofré  
>>> wrote:
>>> >
>>> > Hi Vishwas
>>> >
>>> > Yes, I already started the update.
>>> >
>>> > Regards
>>> > JB
>>> >
>>> > On 06/12/2018 07:39, Vishwas Bm wrote:
>>> > > Hi,
>>> > >
>>> > > Currently I see that the spark version dependency used in Beam is
>>> > > //"2.3.2".
>>> > > As spark 2.4 is released now, is there a plan to upgrade Beam spark
>>> > > dependency ?
>>> > >
>>> > >
>>> > > *Thanks & Regards,*
>>> > > *Vishwas
>>> > > *
>>> > > *Mob : 9164886653*
>>> >
>>> > --
>>> > Jean-Baptiste Onofré
>>> > jbono...@apache.org
>>> > http://blog.nanthrax.net
>>> > Talend - http://www.talend.com
>