Were you looking for this? : https://mail-archives.apache.org/mod_mbox/beam-user/201605.mbox/%3ccagwr7sbr9x7voyuzjhymysb_5_tdugqwudz4sdhop4equ2g...@mail.gmail.com%3E
> Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for multi-threaded access". btw, Thomas sent me full thread stack for this exception. I have a fix for it here https://github.com/apache/incubator-beam/pull/290 . It will be very helpful if you could check it works in your app. Raghu. On Thu, May 5, 2016 at 7:10 AM, amir bahmanyari <[email protected]> wrote: > Hi Raghu, > I noticed you replied to this thread yesterday regarding users getting > affected by this Kafka version difference etc. > And Google DataFlow libs working fine for Unbound KafkaIO etc. > I dont see that email in my inbox anymore. I might have accidentally > deleted it. > Could you resend it pls? I appreciate it... > Have a great day. > > > ------------------------------ > *From:* amir bahmanyari <[email protected]> > *To:* "[email protected]" <[email protected]> > *Sent:* Wednesday, May 4, 2016 12:57 PM > > *Subject:* Re: KafkaIO Usage & Sample Code > > The root cause ended up to be the Kafka version in my lab. > Kafka server must be version 9.0+ for the KafkaIO call in the Beam app > code to populate the PCollection object . > Thanks Thomas so much for diagnosing that. > Appreciate all his valuable time he spent with me offline. > > Status: > *Bounded*: > *Works *when setting either InProcessPipelineRunner or > DirectPipelineRunner > > *Unbounded*: > *Throws *different exceptions for the different above runners. > Thomas has the different stacktraces. > > At the moment, I have withMaxNumRecords(100) set in my KafkaIO call. > This causes the call to block till all the 100 records are received, and > then makes it available to the app to consume it. > I tried running the p.run() in a while(true) loop & > setting withMaxNumRecords(100) so I get chunks of 100 records in multiple > files created by TextIO. > Didnt cut it...Exceptions complaining "KafkaConsumer is not safe for > multi-threaded access". > > I know Unbounded is the ultimate way to achieve a true real-time streaming. > Given that Unbounded is not available at the moment, is there a work > around that makes every single record available immediately to the app? > > Thanks everyone again for your valuable help. > Amir- > ------------------------------ > *From:* Thomas Groh <[email protected]> > *To:* [email protected] > *Sent:* Monday, May 2, 2016 11:41 AM > *Subject:* Re: KafkaIO Usage & Sample Code > > Yeah, JB has it - if that commit ( > https://github.com/apache/incubator-beam/commit/b2b77e380) is in your > history, the call should compile correctly; if it's not, then the > InProcessPipelineRunner doesn't implement the appropriate interface, and > that call won't typecheck (and lead to your compilation failure) - syncing > to a more recent version should fix the problem. > > On Mon, May 2, 2016 at 11:26 AM, Jean-Baptiste Onofré <[email protected]> > wrote: > > Thomas meant that you have to checkout after or at this commit: > > git checkout b2b77e380 > > ;) > > Regards > JB > > On 05/02/2016 07:52 PM, amir bahmanyari wrote: > > Hi Dan, > Sorry! I honestly dont know what "so long as you're on a commit after > b2b77e380" means :))) > If this means there is a specific jar file I need to have on my path, > could you point me to a link where I can get the right jar file pls? > I appreciate i sir. > Amir > > > ------------------------------------------------------------------------ > *From:* Thomas Groh <[email protected]> > *To:* [email protected] > *Sent:* Monday, May 2, 2016 10:02 AM > *Subject:* Re: KafkaIO Usage & Sample Code > > Your calls should work - so long as you're on a commit after b2b77e380 > (when we started implementing PipelineRunner), the > InProcessPipelineRunner should be a valid argument to > PipelineOptions#setRunner > > As an example, there's the InProcessPipelineRunnerTest > ( > https://github.com/apache/incubator-beam/blob/b9116ac426f989af882e6df5dafc5da6c9f203d8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java#L73 > ) > > On Mon, May 2, 2016 at 9:43 AM, Dan Halperin <[email protected] > <mailto:[email protected]>> wrote: > > Hi Amir, > > The problem is likely in using DataflowPipelineOptions.class -- this > is specific to the Cloud Dataflow service and the > DataflowPipelineRunner. Try using just "PipelineOptions". > > Dan > > On Mon, May 2, 2016 at 8:26 AM, amir bahmanyari <[email protected] > <mailto:[email protected]>> wrote: > > Thanks Dan. > I actually had tried it before but got compilation errors at > setting the InProcessPipelineRunner in the PipelineOptions > object.. > I appreciate it if you point me to a working sample code. > FYI, This is my implementation: > import com.google.cloud.dataflow.sdk.options.PipelineOptions; > import > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; > DataflowPipelineOptions Myoptions = > PipelineOptionsFactory.create().as(DataflowPipelineOptions.class); > Myoptions.setRunner(InProcessPipelineRunner.class); > > I cannot set runner as InProcessPipelineRunner in the last line: > The method setRunner(Class<? extends PipelineRunner<?>>) in the > type PipelineOptions is not applicable for the arguments > (Class<InProcessPipelineRunner>). > Thanks for your help. > Amir- > > > > ------------------------------------------------------------------------ > *From:* Dan Halperin <[email protected] > <mailto:[email protected]>> > *To:* [email protected] > <mailto:[email protected]> > *Sent:* Monday, May 2, 2016 12:23 AM > *Subject:* Re: KafkaIO Usage & Sample Code > > On Mon, May 2, 2016 at 12:22 AM, Dan Halperin > <[email protected] <mailto:[email protected]>> wrote: > > Hi Amir, > > As Frances suggested, you can use the > InProcessPipelineRunner instead of the DirectPipelineRunner > to execute your pipeline. (They're both in the codebase, > it's just that the Direct runner is the default. Use the > --runner command line option.) > > > Amending: it is relatively unlikely that the issues that we > caught in testing would affect you. So it should be safe for > your use case to do this -- and definitely safe to at least try > it out! > > > Dan > > On Mon, May 2, 2016 at 12:17 AM, Amir Bahmanyari > <[email protected] <mailto:[email protected]>> wrote: > > Thanks gents > What are our options in the meanwhile? > Cheers > > Sent from my iPhone > > On May 2, 2016, at 12:00 AM, Dan Halperin > <[email protected] <mailto:[email protected]>> wrote: > > On Sun, May 1, 2016 at 10:29 PM, Jean-Baptiste Onofré > <[email protected] <mailto:[email protected]>> wrote: > > Oh, thanks Frances. > > I mixed DirectPipelineRunner ("old" local runner), > and InProcessPipelineRunner ("new" local runner) ;) > > We should remove the DirectPipelineRunner to avoid > confusion. WDYT ? > > > We would like to do this soon, but there are some snags. > > As a preparation step, Thomas swapped the default > runner from Direct to InProcess. (#178 > <https://github.com/apache/incubator-beam/pull/178>) > > However, testing unfortunately exposed some issues > with the InProcess runner. (Actually, I should say > "fortunately" because the tests caught it! Yay!) So we > had to roll it back. (#198 > <https://github.com/apache/incubator-beam/pull/198>) > > > Once we improve the InProcess runner, we can re-do the > default swap. After the swap, once the tests keep > passing for a few days, we do indeed intend to delete > the current Direct pipeline runner and replace it with > the current InProcess runner. > > Dan > > > Regards > JB > > On 05/02/2016 03:12 AM, Frances Perry wrote: > > +Thomas, author of the InProcessPipelineRunner > > The DirectPipelineRunner doesn't yet support > unbounded PCollections. You > can try using the InProcessPipelineRunner, > which is the re-write of > local execution that provides support for > unbounded PCollections and > better checking against the Beam Model. (We'll > be renaming this to the > DirectPipelineRunner in the near future to > avoid having both as soon as > the functionality of the > InProcessPipelineRunner is complete.) > > On Sun, May 1, 2016 at 4:38 PM, amir > bahmanyari <[email protected] > <mailto:[email protected]> > <mailto:[email protected] > > <mailto:[email protected]>>> wrote: > > Hi JB, > I rebuilt my code with the latest : > kafka-0.1.0-incubating-20160501.070733-11.jar > > < > https://repository.apache.org/content/groups/snapshots/org/apache/beam/kafka/0.1.0-incubating-SNAPSHOT/kafka-0.1.0-incubating-20160501.070733-11.jar > > > > > java-sdk-all-0.1.0-incubating-20160501.070453-25.jar > > < > https://repository.apache.org/content/groups/snapshots/org/apache/beam/java-sdk-all/0.1.0-incubating-SNAPSHOT/java-sdk-all-0.1.0-incubating-20160501.070453-25.jar > > > > > Tried _without setting withMaxNumRecords()_: > Throws java.lang.IllegalStateException: no > evaluator registered for > Read(UnboundedKafkaSource) > at > > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > at > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) > at > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > > _With setting ithMaxNumRecords(_), I see > the thread is running, no > exceptions like above, waiting for > incoming Kafka data, but the > method obtaining the data from > processElement(ProcessContext ctx) > never executes. > Therefore, nothing goes into > apply(TextIO.Write.to <http://textio.write.to/> < > http://textio.write.to/> > <http://TextIO.Write.to > <http://textio.write.to/> > <http://textio.write.to/ > >>("c:\\temp\\KafkaOut\\Kafkadata.txt")). > > I see Kafka Broker reports my laptop IP > address as getting a > connection to it, OK. > Everything looks OK at the server side. > Doesn't look like its my lucky day. > I appreciate any help/feedback/suggetion. > Cheers > > > > ------------------------------------------------------------------------ > *From:* Jean-Baptiste Onofré > <[email protected] <mailto:[email protected]> > <mailto:[email protected] <mailto:[email protected]>>> > *To:* [email protected] > <mailto:[email protected]> > <mailto:[email protected] > > <mailto:[email protected]>> > *Sent:* Friday, April 29, 2016 10:36 PM > *Subject:* Re: KafkaIO Usage & Sample Code > > > As I said in my previous e-mail, until > recently DirectPipelineRunner > didn't support Unbounded. > > It's now fixed, so if you take a latest > nightly build, or build master, > it should work. > > As workaround, you can also limit the > number of message consumed from > Kafka (and so work with bounded). > > Regards > JB > > On 04/29/2016 07:12 PM, amir bahmanyari wrote: > > Hi colleagues, > > I am moving this conversation to this > users mailing list as per Max’s > > suggestion. > > Thanks Max. > > Hi JB, > > Hope all is great. > > Is there a resolution to the exception > I sent last night pls? > > When would the sample code to use > KafkaIO be released? > > I really appreciate your valuable time. > Below is the exception > for your > > reference. > > This is how it gets used in my code: > > > > > > > p.apply(KafkaIO.read().withBootstrapServers("kirk:9092").withTopics(topics)); > > > > Have a wonderful weekend. > > Exception in thread "main" > java.lang.IllegalStateException: no > evaluator > > registered for Read(UnboundedKafkaSource) > > at > > > > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > > at > > > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) > > at > > > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > > at > > > > > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > > at > > > > > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) > > at > > > > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) > > at > > > > > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) > > at > > > > > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) > > at > > > > > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) > > at > org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) > > at > > > > > benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286) > > Kind Regards, > > Amir > > > -- > Jean-Baptiste Onofré > [email protected] > <mailto:[email protected]> > <mailto:[email protected] > <mailto:[email protected]>> > http://blog.nanthrax.net > <http://blog.nanthrax.net/> > <http://blog.nanthrax.net/> > Talend - http://www.talend.com > <http://www.talend.com/> <http://www.talend.com/> > > > > > > -- > Jean-Baptiste Onofré > [email protected] <mailto:[email protected]> > http://blog.nanthrax.net <http://blog.nanthrax.net/> > Talend - http://www.talend.com > <http://www.talend.com/> > > > > > > > > > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com > > > > > > >
