Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-16 Thread Utkarsh Parekh
You have “mypackage” incorrectly built. Please check and confirm that.

Utkarsh

On Sun, Jun 16, 2024 at 12:48 PM Sofia’s World  wrote:

> Error is same...- see bottom -
> i have tried to ssh in the container and the directory is setup as
> expected.. so not quite sure where the issue is
> i will try to start from the pipeline with dependencies sample and work
> out from there  w.o bothering the list
>
> thanks again for following up
>  Marco
>
> Could not load main session. Inspect which external dependencies are used
> in the main module of your pipeline. Verify that corresponding packages are
> installed in the pipeline runtime environment and their installed versions
> match the versions used in pipeline submission environment. For more
> information, see:
> https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
> Traceback (most recent call last): File
> "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 115, in create_harness _load_main_session(semi_persistent_directory)
> File
> "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 354, in _load_main_session pickler.load_session(session_file) File
> "/usr/local/lib/python3.11/site-packages/apache_beam/internal/pickler.py",
> line 65, in load_session return desired_pickle_lib.load_session(file_path)
> ^^ File
> "/usr/local/lib/python3.11/site-packages/apache_beam/internal/dill_pickler.py",
> line 446, in load_session return dill.load_session(file_path)
>  File
> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 368, in
> load_session module = unpickler.load()  File
> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 472, in load
> obj = StockUnpickler.load(self) ^ File
> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 827, in
> _import_module return getattr(__import__(module, None, None, [obj]), obj)
> ^ ModuleNotFoundError: No module named
> 'mypackage'
>
>
>
> On Sun, 16 Jun 2024, 14:50 XQ Hu via user,  wrote:
>
>> What is the error message now?
>> You can easily ssh to your docker container and check everything is
>> installed correctly by:
>> docker run --rm -it --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE
>>
>>
>> On Sun, Jun 16, 2024 at 5:18 AM Sofia’s World 
>> wrote:
>>
>>> Valentin, many thanks... i actually spotted the reference in teh setup
>>> file
>>> However , after correcting it, i am still at square 1 where somehow my
>>> runtime environment does not see it.. so i added some debugging to my
>>> Dockerfile to check if i forgot to copy something,
>>> and here's the output, where i can see the mypackage has been copied
>>>
>>> here's my directory structure
>>>
>>>  mypackage
>>> __init__.py
>>> obbutils.py
>>> launcher.py
>>> __init__.py
>>> dataflow_tester.py
>>> setup_dftester.py (copied to setup.py)
>>>
>>> i can see the directory structure has been maintained when i copy my
>>> files to docker as i added some debug to my dockerfile
>>>
>>> Step #0 - "dftester-image": Removing intermediate container 4c4e763289d2
>>> Step #0 - "dftester-image":  ---> cda378f70a9e
>>> Step #0 - "dftester-image": Step 6/23 : COPY requirements.txt .
>>> Step #0 - "dftester-image":  ---> 9a43da08b013
>>> Step #0 - "dftester-image": Step 7/23 : COPY setup_dftester.py setup.py
>>> Step #0 - "dftester-image":  ---> 5a6bf71df052
>>> Step #0 - "dftester-image": Step 8/23 : COPY dataflow_tester.py .
>>> Step #0 - "dftester-image":  ---> 82cfe1f1f9ed
>>> Step #0 - "dftester-image": Step 9/23 : COPY mypackage mypackage
>>> Step #0 - "dftester-image":  ---> d86497b791d0
>>> Step #0 - "dftester-image": Step 10/23 : COPY __init__.py
>>> ${WORKDIR}/__init__.py
>>> Step #0 - "dftester-image":  ---> 337d149d64c7
>>> Step #0 - "dftester-image": Step 11/23 : RUN echo '- listing workdir'
>>> Step #0 - "dftester-image":  ---> Running in 9d97d8a64319
>>> Step #0 - "dftester-image": - listing workdir
>>> Step #0 - "dftester-image": Removing intermediate container 9d97d8a64319
>>> Step #0 - "dftester-image":  ---> bc9a6a2aa462
>>> Step #0 - "dftester-image": Step 12/23 : RUN ls -la ${WORKDIR}
>>> Step #0 - "dftester-image":  ---> Running in cf164108f9d6
>>> Step #0 - "dftester-image": total 24
>>> Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 .
>>> Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 ..
>>> Step #0 - "dftester-image": -rw-r--r-- 1 root root0 Jun 16 08:57
>>> __init__.py
>>> Step #0 - "dftester-image": -rw-r--r-- 1 root root  135 Jun 16 08:57
>>> dataflow_tester.py
>>> Step #0 - "dftester-image": drwxr-xr-x 2 root root 4096 Jun 16 08:59
>>> mypackage
>>> Step #0 - "dftester-image": -rw-r--r-- 1 root root   64 Jun 16 08:57
>>> requirements.txt
>>> Step #0 - "dftester-image": -rw-r--r-- 1 root root  736 Jun 16 08:57
>>> 

Re: Offset access in Kafka messages in Python

2024-03-28 Thread Utkarsh Parekh
HI Ondrej,

You can retrieve metadata info using SDF (here is the video
 from Israel Herraiz) and the
kafka-python  PY package.

Utkarsh


On Tue, Mar 26, 2024 at 8:00 AM Ondřej Pánek  wrote:

> Hello team,
>
>
>
> Is it possible to somehow retrieve metadata like topic, partition and
> offset information from the consumed messages from Kafka source? I mean, if
> that’s possible to do so in Python. I understand, that in Java, there is
> the KafkaRecord construct, which offers these metadata, but I haven’t found
> any alternative in Python.
>
>
>
> The reasoning is that we need to consume CDC data, where the offset
> indicates the message’s order, i.e. the order of the CDC operation, and we
> need to store it to process it later in BigQuery.
>
>
>
> Best regards,
>
>
>
> Ondrej
>


Re: Request to join slack channel

2024-02-21 Thread Utkarsh Parekh
Hello,

Can someone add me in slack channel as well?

Thanks,
Utkarsh

On Wed, Feb 21, 2024 at 11:10 AM George Dekermenjian 
wrote:

> Me too please - slack channel.
>
>
> On Wed, Feb 21, 2024 at 19:43 Geddy Schellevis 
> wrote:
>
>> Hey Valentyn,
>>
>> can you add me to slack channel as well?
>> Thanks
>>
>>
>> Op wo 21 feb 2024 om 19:21 schreef Valentyn Tymofieiev via user <
>> user@beam.apache.org>
>>
>>> Hi Daniel, I submitted an invitation for your email address as well.
>>>
>>> On Tue, Feb 20, 2024 at 3:56 PM Daniel Chen  wrote:
>>>
 Hey Valentyn, can you add me to slack channel as well?

 On Tue, Feb 20, 2024 at 3:23 PM Valentyn Tymofieiev via user <
 user@beam.apache.org> wrote:

> Hi Lydian,
>
> According to https://infra.apache.org/slack.html, invitations by link
> have been disabled. I submitted an invitation for your email address.
>
> Thanks,
> Valentyn
>
> On Tue, Feb 20, 2024 at 9:33 AM Lydian Lee 
> wrote:
>
>> Hi,
>>
>> Can I get the invitation to join slack channel ?  The ASF slack seems
>> required invitation to be able to join. Thanks
>>
>


Re: Beam 2.53.0 Release

2024-01-05 Thread Utkarsh Parekh
Awesome!!

On Fri, Jan 5, 2024 at 2:11 PM XQ Hu via user  wrote:

> Great! And thank you!
>
> On Fri, Jan 5, 2024 at 2:49 PM Jack McCluskey via user <
> user@beam.apache.org> wrote:
>
>> We are happy to present the new 2.53.0 release of Beam.
>> This release includes both improvements and new functionality.
>> For more information on changes in 2.53.0, check out the detailed release
>> notes (https://github.com/apache/beam/milestone/17).
>>
>> Highlights
>> * Python streaming users that use 2.47.0 and newer versions of Beam
>> should update to version 2.53.0, which fixes a known issue (
>> https://github.com/apache/beam/issues/27330)
>>
>> I/Os
>> * TextIO now supports skipping multiple header lines (Java)(
>> https://github.com/apache/beam/issues/17990)
>> * Python GCSIO is now implemented with GCP GCS Client instead of apitools
>> (https://github.com/apache/beam/issues/25676)
>> * Adding support for LowCardinality DataType in ClickHouse (Java) (
>> https://github.com/apache/beam/pull/29533)
>> * Added support for handling bad records to KafkaIO (Java) (
>> https://github.com/apache/beam/pull/29546)
>> * Add support for generating text embeddings in MLTransform for Vertex AI
>> and Hugging Face Hub models. (https://github.com/apache/beam/pull/29564)
>> * NATS IO connector added (Go) (
>> https://github.com/apache/beam/issues/29000)
>>
>> New Features / Improvements
>> * The Python SDK now type checks `collections.abc.Collections` types
>> properly. Some type hints that were erroneously allowed by the SDK may now
>> fail (https://github.com/apache/beam/pull/29272)
>> * Running multi-language pipelines locally no longer requires Docker.
>>   Instead, the same (generally auto-started) subprocess used to perform
>> the
>>   expansion can also be used as the cross-language worker.
>> * Framework for adding Error Handlers to composite transforms added in
>> Java (https://github.com/apache/beam/pull/29164)
>> * Python 3.11 images now include google-cloud-profiler (
>> https://github.com/apache/beam/pull/29651)
>>
>> Breaking Changes
>> * Upgraded to go 1.21.5 to build, fixing CVE-2023-45285 (
>> https://security-tracker.debian.org/tracker/CVE-2023-45285) and
>> CVE-2023-39326 (
>> https://security-tracker.debian.org/tracker/CVE-2023-39326)
>>
>> Deprecations
>> * Euphoria DSL is deprecated and will be removed in a future release (not
>> before 2.56.0) (https://github.com/apache/beam/issues/29451)
>>
>> Bugfixes
>> * (Python) Fixed sporadic crashes in streaming pipelines that affected
>> some users of 2.47.0 and newer SDKs (
>> https://github.com/apache/beam/issues/27330)
>> * (Python) Fixed a bug that caused MLTransform to drop identical elements
>> in the output PCollection (https://github.com/apache/beam/issues/29600)
>>
>> Thanks,
>>
>> Jack McCluskey
>> --
>>
>>
>> Jack McCluskey
>> SWE - DataPLS PLAT/ Dataflow ML
>> RDU
>> jrmcclus...@google.com
>>
>>
>>


Where to specify trust.jks

2023-05-10 Thread Utkarsh Parekh
Hi,

I'm testing a streaming app using kafka, Dafaflow, and Apache beam
[Python].

 "Error message from worker: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed
to construct kafka consumer
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:888)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:825)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:829) Caused by:
java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed
to construct kafka consumer
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:136)
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975)
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
Source)
org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2319)
org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct
kafka consumer
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612)

Apache beam

2023-05-05 Thread Utkarsh Parekh
Hi,

I'm writing a simple streaming beam application. The application job is
doing following tasks:

1. Reads data from GCS bucket (project 1) and loads into Kafka topic
2. Reads data from Kafka topic and loads into BigQUery (project 3)

Composer running in Project 1
Data Flow running in project 2

I'm using BeamRunPythonPipelineOperator  and  DataflowConfiguration
configuration.

Is this the right setup? What would be the gcp_conn_id? Any suggestions?

Utkarsh


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
And I also get this error occasionally when I execute a streaming pipeline
with a new cluster instead of an existing cluster.

https://issues.apache.org/jira/browse/BEAM-12032?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel

On Tue, Feb 1, 2022 at 1:59 PM Utkarsh Parekh 
wrote:

> If you tested earlier with the same stack, which version did you use?
>
> *Can you enable debug logs to check what’s happening there? *So far the
> following warning was received from from log4j which I received from log4j
> on Databricks (no errors other than that).
>
> *Can you make sure that there is no issue with firewall or something? *No
> I don't think so. Because it's working fine locally and databricks notebook.
>
> *Can you run this pipeline locally against a real Kafka server, not Azure
> Event Hub, to make sure that it works fine? *Yes it's working fine with
> both Azure EventHub and Kafka
>
>
>
> org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()'
> at
> org.springframework.expression.spel.support.StandardTypeConverter.(StandardTypeConverter.java:46)
> at
> org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197)
> at
> org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115)
> at
> org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201)
> at
> org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130)
> at
> org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
> at
> org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
> at
> org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
> at
> org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
> at
> org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
> at
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85)
> at
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207)
> at
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172)
> at
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111)
> at
> org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393)
> at
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
> at
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
> at
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
> at
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
> at
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.ResultT

Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
(Executor.scala:683)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

On Tue, Feb 1, 2022 at 12:07 PM Alexey Romanenko 
wrote:

> Well, personally I didn’t test with this version, but it should be fine…
> Can you enable debug logs to check what’s happening there?
> Can you make sure that there is no issue with firewall or something?
> Can you run this pipeline locally against a real Kafka server, not Azure
> Event Hub, to make sure that it works fine?
> Otherwise, it would need to debug remotely the worker process.
>
> On 1 Feb 2022, at 19:18, Utkarsh Parekh 
> wrote:
>
> Sorry I sent the last message in a hurry. Here is the Beam java to kafka:
> Is something missing here?
>
> 
> org.apache.beam
> beam-sdks-java-io-kafka
> 2.35.0
> 
>
>
> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh 
> wrote:
>
>> Here it is
>>
>> 
>> org.apache.kafka
>> kafka-clients
>> 2.8.0
>> 
>>
>>
>> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko 
>> wrote:
>>
>>> Hmm, this is strange. Which version of Kafka client do you use while
>>> running it with Beam?
>>>
>>> On 1 Feb 2022, at 16:56, Utkarsh Parekh 
>>> wrote:
>>>
>>> Hi Alexey,
>>>
>>> First of all, thank you for the response! Yes I did have it in Consumer
>>> configuration and try to increase "session.timeout".
>>>
>>> From consumer side so far I've following settings:
>>>
>>> props.put("sasl.mechanism", SASL_MECHANISM);
>>> props.put("security.protocol", SECURITY_PROTOCOL);
>>> props.put("sasl.jaas.config", saslJaasConfig);
>>> props.put("request.timeout.ms", 6);
>>> props.put("session.timeout.ms", 6);
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
>>> AUTO_OFFSET_RESET_CONFIG);
>>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>>
>>>
>>> It works fine using following code in Databricks Notebook. The problem
>>> has been occurring when I run it through Apache beam and KafkaIO (Just
>>> providing more context if that may help you to understand problem)
>>>
>>> val df = spark.readStream
>>> .format("kafka")
>>> .option("subscribe", TOPIC)
>>> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>>> .option("kafka.sasl.mechanism", "PLAIN")
>>> .option("kafka.security.protocol", "SASL_SSL")
>>> .option("kafka.sasl.jaas.config", EH_SASL)
>>> .option("kafka.request.timeout.ms", "6")
>>> .option("kafka.session.timeout.ms", "6")
>>> .option("failOnDataLoss", "false")
>>> //.option("kafka.group.id", "testsink")
>>> .option("startingOffsets", "latest")
>>> .load()
>>>
>>> Utkarsh
>>>
>>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
>>>> Hi Utkarsh,
>>>>
>>>> Can it be related to this configuration problem?
>>>>
>>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>>>
>>>> Did you check timeout settings?
>>>>
>>>> —
>>>> Alexey
>>>>
>>>>
>>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh 
>>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>>>> trying to create a simple streaming app with Apache Beam, where it reads
>>>> data from an Azure event hub and produces messages into another Azure event
>>>> hub.
>>>>
>>>> I'm creating and running spark jobs on Azure Databricks.
>>>>
>>>> The problem is the consumer (uses SparkRunner) is not able to read data
>>>> from Event hub (queue). There is no activity and no errors on the Spark
>>>> cluster.
>>>>
>>>> I would appreciate it if anyone could help to fix this issue.
>>>>
>>>> Thank you
>>>>
>>>> Utkarsh
>>>>
>>>>
>>>>
>>>
>


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
Sorry I sent the last message in a hurry. Here is the Beam java to kafka:
Is something missing here?


org.apache.beam
beam-sdks-java-io-kafka
2.35.0



On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh 
wrote:

> Here it is
>
> 
> org.apache.kafka
> kafka-clients
> 2.8.0
> 
>
>
> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko 
> wrote:
>
>> Hmm, this is strange. Which version of Kafka client do you use while
>> running it with Beam?
>>
>> On 1 Feb 2022, at 16:56, Utkarsh Parekh 
>> wrote:
>>
>> Hi Alexey,
>>
>> First of all, thank you for the response! Yes I did have it in Consumer
>> configuration and try to increase "session.timeout".
>>
>> From consumer side so far I've following settings:
>>
>> props.put("sasl.mechanism", SASL_MECHANISM);
>> props.put("security.protocol", SECURITY_PROTOCOL);
>> props.put("sasl.jaas.config", saslJaasConfig);
>> props.put("request.timeout.ms", 6);
>> props.put("session.timeout.ms", 6);
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>
>>
>> It works fine using following code in Databricks Notebook. The problem
>> has been occurring when I run it through Apache beam and KafkaIO (Just
>> providing more context if that may help you to understand problem)
>>
>> val df = spark.readStream
>> .format("kafka")
>> .option("subscribe", TOPIC)
>> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>> .option("kafka.sasl.mechanism", "PLAIN")
>> .option("kafka.security.protocol", "SASL_SSL")
>> .option("kafka.sasl.jaas.config", EH_SASL)
>> .option("kafka.request.timeout.ms", "6")
>> .option("kafka.session.timeout.ms", "6")
>> .option("failOnDataLoss", "false")
>> //.option("kafka.group.id", "testsink")
>> .option("startingOffsets", "latest")
>> .load()
>>
>> Utkarsh
>>
>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko 
>> wrote:
>>
>>> Hi Utkarsh,
>>>
>>> Can it be related to this configuration problem?
>>>
>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>>
>>> Did you check timeout settings?
>>>
>>> —
>>> Alexey
>>>
>>>
>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh 
>>> wrote:
>>>
>>> Hello,
>>>
>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>>> trying to create a simple streaming app with Apache Beam, where it reads
>>> data from an Azure event hub and produces messages into another Azure event
>>> hub.
>>>
>>> I'm creating and running spark jobs on Azure Databricks.
>>>
>>> The problem is the consumer (uses SparkRunner) is not able to read data
>>> from Event hub (queue). There is no activity and no errors on the Spark
>>> cluster.
>>>
>>> I would appreciate it if anyone could help to fix this issue.
>>>
>>> Thank you
>>>
>>> Utkarsh
>>>
>>>
>>>
>>


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
Here it is


org.apache.kafka
kafka-clients
2.8.0



On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko 
wrote:

> Hmm, this is strange. Which version of Kafka client do you use while
> running it with Beam?
>
> On 1 Feb 2022, at 16:56, Utkarsh Parekh 
> wrote:
>
> Hi Alexey,
>
> First of all, thank you for the response! Yes I did have it in Consumer
> configuration and try to increase "session.timeout".
>
> From consumer side so far I've following settings:
>
> props.put("sasl.mechanism", SASL_MECHANISM);
> props.put("security.protocol", SECURITY_PROTOCOL);
> props.put("sasl.jaas.config", saslJaasConfig);
> props.put("request.timeout.ms", 6);
> props.put("session.timeout.ms", 6);
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>
>
> It works fine using following code in Databricks Notebook. The problem has
> been occurring when I run it through Apache beam and KafkaIO (Just
> providing more context if that may help you to understand problem)
>
> val df = spark.readStream
> .format("kafka")
> .option("subscribe", TOPIC)
> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
> .option("kafka.sasl.mechanism", "PLAIN")
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.jaas.config", EH_SASL)
> .option("kafka.request.timeout.ms", "6")
> .option("kafka.session.timeout.ms", "6")
> .option("failOnDataLoss", "false")
> //.option("kafka.group.id", "testsink")
> .option("startingOffsets", "latest")
>     .load()
>
> Utkarsh
>
> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko 
> wrote:
>
>> Hi Utkarsh,
>>
>> Can it be related to this configuration problem?
>>
>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>
>> Did you check timeout settings?
>>
>> —
>> Alexey
>>
>>
>> On 1 Feb 2022, at 02:27, Utkarsh Parekh 
>> wrote:
>>
>> Hello,
>>
>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>> trying to create a simple streaming app with Apache Beam, where it reads
>> data from an Azure event hub and produces messages into another Azure event
>> hub.
>>
>> I'm creating and running spark jobs on Azure Databricks.
>>
>> The problem is the consumer (uses SparkRunner) is not able to read data
>> from Event hub (queue). There is no activity and no errors on the Spark
>> cluster.
>>
>> I would appreciate it if anyone could help to fix this issue.
>>
>> Thank you
>>
>> Utkarsh
>>
>>
>>
>


Re: Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-02-01 Thread Utkarsh Parekh
Hi Alexey,

First of all, thank you for the response! Yes I did have it in Consumer
configuration and try to increase "session.timeout".

>From consumer side so far I've following settings:

props.put("sasl.mechanism", SASL_MECHANISM);
props.put("security.protocol", SECURITY_PROTOCOL);
props.put("sasl.jaas.config", saslJaasConfig);
props.put("request.timeout.ms", 6);
props.put("session.timeout.ms", 6);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);


It works fine using following code in Databricks Notebook. The problem has
been occurring when I run it through Apache beam and KafkaIO (Just
providing more context if that may help you to understand problem)

val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "6")
.option("kafka.session.timeout.ms", "6")
.option("failOnDataLoss", "false")
//.option("kafka.group.id", "testsink")
.option("startingOffsets", "latest")
.load()

Utkarsh

On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko 
wrote:

> Hi Utkarsh,
>
> Can it be related to this configuration problem?
>
> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>
> Did you check timeout settings?
>
> —
> Alexey
>
>
> On 1 Feb 2022, at 02:27, Utkarsh Parekh 
> wrote:
>
> Hello,
>
> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
> trying to create a simple streaming app with Apache Beam, where it reads
> data from an Azure event hub and produces messages into another Azure event
> hub.
>
> I'm creating and running spark jobs on Azure Databricks.
>
> The problem is the consumer (uses SparkRunner) is not able to read data
> from Event hub (queue). There is no activity and no errors on the Spark
> cluster.
>
> I would appreciate it if anyone could help to fix this issue.
>
> Thank you
>
> Utkarsh
>
>
>


Running small app using Apache Beam, KafkaIO, Azure EventHuband Databricks Spark

2022-01-31 Thread Utkarsh Parekh
Hello,

I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm trying
to create a simple streaming app with Apache Beam, where it reads data from
an Azure event hub and produces messages into another Azure event hub.

I'm creating and running spark jobs on Azure Databricks.

The problem is the consumer (uses SparkRunner) is not able to read data
from Event hub (queue). There is no activity and no errors on the Spark
cluster.

I would appreciate it if anyone could help to fix this issue.

Thank you

Utkarsh