Hi Amir,

I suggest these changes:

 - Add a dependency on `beam-sdks-java-core`. This is the main artifact for
use in Java.
 - Remove the dependency on `beam-runners-core-java`. This is an artifact
for runner authors. You shouldn't need it.
 - Add a runtime-scoped dependency on `beam-runners-direct-java` for local
testing.
 - Add a runtime-scoped dependency on another runner. From your other
emails, it seems you might want `beam-runners-flink_2.10`.

Side note: Beam 0.2.0-incubating is released.

Kenn

On Tue, Aug 9, 2016 at 10:42 AM, amir bahmanyari <[email protected]>
wrote:

> Hi Kenn,
> Following is an extract from my pom. It works for me the way it is.
> But, I appreciate any improvements suggestions.
> Thanks+have a great day.
> Cheers
>
>     <dependencies>
> <dependency>
>    <groupId>org.apache.beam</groupId>
>    <artifactId>beam-runners-core-java</artifactId>
>    <version>0.1.0-incubating</version>
> </dependency>
>
>
> <dependency>
>   <groupId>biz.paluch.redis</groupId>
>   <artifactId>lettuce</artifactId>
>   <version>3.4.3.Final</version>
> </dependency>
>
> <dependency>
>       <groupId>org.apache.beam</groupId>
>       <artifactId>flink-runner_2.10</artifactId>
>      <version>0.1.0-incubating-SNAPSHOT</version>
>     </dependency>
>
> <dependency>
>     <groupId>org.apache.beam</groupId>
>     <artifactId>beam-sdks-java-io-kafka</artifactId>
>     <version>0.1.0-incubating</version>
> </dependency>
>     </dependencies>
>
>     <build>
>         <plugins>
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-shade-plugin</artifactId>
>                 <version>2.4.1</version>
>                 <executions>
>                     <execution>
>                         <phase>package</phase>
>                         <goals>
>                             <goal>shade</goal>
>                         </goals>
>                         <configuration>
>                             <transformers>
>                                 <transformer implementation="org.apache.
> maven.plugins.shade.resource.ManifestResourceTransformer">
>                                     <mainClass>benchmark.flinkspark.flink.
> BenchBeamRunners</mainClass>
>                                 </transformer>
>                             </transformers>
>                             <artifactSet>
>                                 <excludes>
>                                     <exclude>org.apache.flink:*</exclude>
>                                 </excludes>
>                             </artifactSet>
>                         </configuration>
>                     </execution>
>                 </executions>
>             </plugin>
> <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-compiler-plugin</artifactId>
>                 <version>3.1</version>
>                 <configuration>
>                     <source>1.8</source>
>                     <target>1.8</target>
>                 </configuration>
>             </plugin>
>         </plugins>
>
>     </build>
> </project>
>
>
>
> ------------------------------
> *From:* Kenneth Knowles <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Monday, August 8, 2016 7:29 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Hi Amir,
>
> How are you assembling your classpath? The library that contains that
> class should be automatically included.
>
> Kenn
>
> On Mon, Aug 8, 2016 at 5:30 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Yes :) I figured it out and it compiled.
> now ClassNotFound ar runtime: org.apache.beam.runners.core.
> GroupAlsoByWindowViaWindowSetD oFn
> Researching the right runners package...
> Thanks 1000000000000 Thomas...hope I will see reasonable results at least
> just to convince my own team why we get different results at different runs
> for a GEEAD REASON..
> Cheers
>
> ------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected] ; amir bahmanyari <
> [email protected]>
> *Sent:* Monday, August 8, 2016 5:26 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Missed a ParDo; that last line should be .apply(ParDo.of(new DoFn...))
>
> On Mon, Aug 8, 2016 at 4:56 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Hi Thomas,
> I used the following & get a compilation error:
> "The method apply(PTransform<? super PCollection<Iterable<String>>,
> OutputT>) in the type PCollection<Iterable<String>> is not applicable for
> the arguments (new DoFn<Iterable<String>,String>( ){})"
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>           .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>     .discardingFiredPanes());
>
> kafkarecords.apply(WithKeys.< Integer, String>of(1))
>     .apply(GroupByKey.<Integer, String>create())
>     .apply(Values.<Iterable< String>>create())
>     .apply(new DoFn<Iterable<String>, String>() {......
>
>
> I compared with some GroupByKey examples ...nothing that matched it.
> Should I simplify the KafkaIO() call to avoid this compilation error?
> Thanks for your help.
> Amir-
> ------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected] ; amir bahmanyari <
> [email protected]>
> *Sent:* Monday, August 8, 2016 3:50 PM
>
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> You would performance no better than single-threaded behavior if you group
> everything into a single key, hence why this approach is strongly not
> recommended. You can still get continuous output, depending on the
> triggering, but you lose all of scaling benefits of running a pipeline as
> opposed to a simple Java program, plus may incur some additional overhead.
>
> To enforce this sort of threading you would do something among the lines
> of:
>
> kafkarecords.apply(WithKeys.< Integer, String>of(1))
>     .apply(GroupByKey.<Integer, String>create())
>     .apply(Values.<Iterable< String>>create())
>     .apply(new DoFn<Iterable<String>, String>() {...});
>
> Where the DoFn unrolls its input and on each element applies the
> processing.
>
>
> On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Thanks so much Thomas.
> Fantastic answer & great learning about whats really going on underneath
> the hood.
> Have a question on your suggestion: "To do so, you would key the inputs
> to a single static key and apply a GroupByKey, running the processing
> method on the output Iterable produced by the GroupByKey"...
> Wouldn't doing such defeats the "real-time Streaming" objectives?
> To me the above leads to a simulation of a simple single threaded java
> process but its executing in a massively parallel infrastructure in
>  a"fancy" way :-)
> Is there an example that demonstrates how to actually implement your
> suggestion above without any hidden loopholes pls?
> I can at least try it and see how far it gets for R&D purposes & share the
> results with the community.
> Cheers+have a wonderful day.
>
> ------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected] ; amir bahmanyari <
> [email protected]>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <[email protected]>
> *To:* [email protected] ; amir bahmanyari <
> [email protected]>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to