Re: Does withkeys transform enforce a reshuffle?

2024-01-19 Thread hsy...@gmail.com
Also I looked at the code, reshuffle seems doing some groupby work
internally. But I don't really need groupby

On Fri, Jan 19, 2024 at 9:35 AM hsy...@gmail.com  wrote:

> ReShuffle is deprecated
>
> On Fri, Jan 19, 2024 at 8:25 AM XQ Hu via user 
> wrote:
>
>> I do not think it enforces a reshuffle by just checking the doc here:
>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=withkeys#apache_beam.transforms.util.WithKeys
>>
>> Have you tried to just add ReShuffle after PubsubLiteIO?
>>
>> On Thu, Jan 18, 2024 at 8:54 PM hsy...@gmail.com 
>> wrote:
>>
>>> Hey guys,
>>>
>>> I have a question, does withkeys transformation enforce a reshuffle?
>>>
>>> My pipeline basically look like this PubsubLiteIO -> ParDo(..) ->
>>> ParDo() -> BigqueryIO.write()
>>>
>>> The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused
>>> together. But The ParDo is expensive and I want dataflow to have more
>>> workers to work on that, what's the best way to do that?
>>>
>>> Regards,
>>>
>>>


Re: Does withkeys transform enforce a reshuffle?

2024-01-19 Thread hsy...@gmail.com
ReShuffle is deprecated

On Fri, Jan 19, 2024 at 8:25 AM XQ Hu via user  wrote:

> I do not think it enforces a reshuffle by just checking the doc here:
> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=withkeys#apache_beam.transforms.util.WithKeys
>
> Have you tried to just add ReShuffle after PubsubLiteIO?
>
> On Thu, Jan 18, 2024 at 8:54 PM hsy...@gmail.com  wrote:
>
>> Hey guys,
>>
>> I have a question, does withkeys transformation enforce a reshuffle?
>>
>> My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> ParDo()
>> -> BigqueryIO.write()
>>
>> The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused
>> together. But The ParDo is expensive and I want dataflow to have more
>> workers to work on that, what's the best way to do that?
>>
>> Regards,
>>
>>


Does withkeys transform enforce a reshuffle?

2024-01-18 Thread hsy...@gmail.com
Hey guys,

I have a question, does withkeys transformation enforce a reshuffle?

My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> ParDo()
-> BigqueryIO.write()

The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused together.
But The ParDo is expensive and I want dataflow to have more workers to work
on that, what's the best way to do that?

Regards,


ParDo(DoFn) with multiple context.output vs FlatMapElements

2023-12-27 Thread hsy...@gmail.com
Hello

I have a question. If I have a transform for each input it will emit 1 or
many output (same collection)
I can do it with ParDo + DoFun while in processElement method for each
input, call context.output multiply times vs doing it with FlatMapElements,
is there any difference? Does the dataflow fuse the downstream transform
automatically? Eventually I want more downstream transform workers cause it
needs to handle more data, How do I supposed to do that?

Regards,
Siyuan


pubsubliteio ack problem

2023-12-21 Thread hsy...@gmail.com
In my application, the pubsubliteio seems never ack the message and the
data lateness is building up forever, my question is how does dataflow know
when to ack the message, How does the engine even know when it is
processed?


How to set flow control for pubsubliteio?

2023-12-20 Thread hsy...@gmail.com
How to change flow control config for pubsubliteio ?

I saw the setting has been taken out as part of
https://issues.apache.org/jira/browse/BEAM-14129

But without setup flow control correctly, my beam app is running super slow
ingesting from pubsbulite and getting NO_CLIENT_TOKEN error on the server
side, which suggest to increase the flow control setting


Re: pubsubliteio is super slow

2023-12-19 Thread hsy...@gmail.com
Do you have a ticket?

And in the pubsublite metrics it show NO_CLIENT_TOKENS

On Tue, Dec 19, 2023 at 1:39 PM Nirav Patel  wrote:

> we have. yes it is super slow.  I tested python, java IO version as well
> besides beam IO. we reported to google about this problem.
>
> On Tue, Dec 19, 2023 at 10:17 AM hsy...@gmail.com 
> wrote:
>
>> Any one is using pubsublite? I find it super slow 5 messages/sec and
>> there is no options for me to tune the performance
>>
>


pubsubliteio is super slow

2023-12-19 Thread hsy...@gmail.com
Any one is using pubsublite? I find it super slow 5 messages/sec and there
is no options for me to tune the performance


Re: Questions about writing to BigQuery using storage api

2023-12-07 Thread hsy...@gmail.com
Caused by is in the error message

On Thu, Dec 7, 2023 at 10:47 AM Reuven Lax via user 
wrote:

> This is the stack trace of the rethrown exception. The log should also
> contain a "caused by" log somewhere detailing the original exception. Do
> you happen to have that?
>
> On Thu, Dec 7, 2023 at 8:46 AM hsy...@gmail.com  wrote:
>
>> Here is the complete stacktrace  It doesn't even hit my code and it
>> happens consistently!
>>
>> Error message from worker: java.lang.RuntimeException:
>> java.lang.IllegalStateException
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>> Caused by: java.lang.IllegalStateException
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:430)
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
>> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660)
>> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:518)
>> org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
>> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>&g

Re: Questions about writing to BigQuery using storage api

2023-12-07 Thread hsy...@gmail.com
Here is the complete stacktrace  It doesn't even hit my code and it happens
consistently!

Error message from worker: java.lang.RuntimeException:
java.lang.IllegalStateException
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
Caused by: java.lang.IllegalStateException
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:430)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:85)
org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660)
org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:518)
org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:185)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)

Regards,
Siyuan

On Wed, Dec 6, 2023 at 10:12 AM Ahmed Abualsaud 
wrote:

> Hey, can you provide the full stack trace for the error you're seeing?
> Also is this happening consistently?
>
> *+1* to raising a Google ticket where we'll have more visibility.
>
> On Wed, Dec 6, 2023 at 11:33 AM John Casey 
> wrote:
>
>> Hmm. It may be best if you raise a ticket with Google support for this. I
>> can inspect your job directly if you do that, and that will make this more
>> straightforward.
>>
>> On Wed, Dec 6, 2023 at 11:24 AM hsy...@gmail.com 
>> wrote:
>>
>>> I’m just using dataflow engine
>>> On Wed, Dec 6, 202

Re: Questions about writing to BigQuery using storage api

2023-12-06 Thread hsy...@gmail.com
I’m just using dataflow engine
On Wed, Dec 6, 2023 at 08:23 John Casey via user 
wrote:

> Well, that is odd. It looks like the underlying client is closed, which is
> unexpected.
>
> Do you see any retries in your pipeline? Also, what runner are you using?
>
> @Ahmed Abualsaud  this might be interesting to
> you too
>
> On Tue, Dec 5, 2023 at 9:39 PM hsy...@gmail.com  wrote:
>
>> I'm using version 2.51.0 and The configuration is like this
>>
>> write
>> .withoutValidation()
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>> .withExtendedErrorInfo()
>> .withMethod(Write.Method.STORAGE_WRITE_API)
>> .withTriggeringFrequency(Duration.standardSeconds(10))
>> .withAutoSharding().optimizedWrites()
>> .withFailedInsertRetryPolicy(retryTransientErrors());
>>
>>
>> On Tue, Dec 5, 2023 at 11:20 AM John Casey via user 
>> wrote:
>>
>>> Hi,
>>>
>>> Could you add some more detail? Which beam version are you using?
>>>
>>>
>>> On Tue, Dec 5, 2023 at 1:52 PM hsy...@gmail.com 
>>> wrote:
>>>
>>>> Any one has experience in writing to BQ using storage api
>>>>
>>>> I tried to use it because according to the document it is more efficient
>>>> but I got error below
>>>>
>>>> 2023-12-05 04:01:29.741 PST
>>>> Error message from worker: java.lang.RuntimeException:
>>>> java.lang.IllegalStateException
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>>>> Caused by: java.lang.IllegalStateException
>>>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
>>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>>>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
>>>> Source)
>>>>
>>>


Re: Questions about writing to BigQuery using storage api

2023-12-05 Thread hsy...@gmail.com
I'm using version 2.51.0 and The configuration is like this

write
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withMethod(Write.Method.STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(10))
.withAutoSharding().optimizedWrites()
.withFailedInsertRetryPolicy(retryTransientErrors());


On Tue, Dec 5, 2023 at 11:20 AM John Casey via user 
wrote:

> Hi,
>
> Could you add some more detail? Which beam version are you using?
>
>
> On Tue, Dec 5, 2023 at 1:52 PM hsy...@gmail.com  wrote:
>
>> Any one has experience in writing to BQ using storage api
>>
>> I tried to use it because according to the document it is more efficient
>> but I got error below
>>
>> 2023-12-05 04:01:29.741 PST
>> Error message from worker: java.lang.RuntimeException:
>> java.lang.IllegalStateException
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>> Caused by: java.lang.IllegalStateException
>> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
>> org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
>> Source)
>>
>


Questions about writing to BigQuery using storage api

2023-12-05 Thread hsy...@gmail.com
Any one has experience in writing to BQ using storage api

I tried to use it because according to the document it is more efficient
but I got error below

2023-12-05 04:01:29.741 PST
Error message from worker: java.lang.RuntimeException:
java.lang.IllegalStateException
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:573)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
Caused by: java.lang.IllegalStateException
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl$1.pin(BigQueryServicesImpl.java:1403)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:565)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:790)
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)


Watermark never progress for deduplicate transform

2023-09-06 Thread hsy...@gmail.com
Hello,

I'm using the
https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Deduplicate.html
transform to help dedup my data but in the monitoring page I see the
watermark is not moving forward. Is it common for that transformation?

Thanks


Question about metrics

2023-05-12 Thread hsy...@gmail.com
Hi I have questions about metrics. I want to use beam metrics api to send
metrics to GCP monitoring.  Instead of collecting just some simple
numeric values. I also need to send labels along with them. Is there a way
to do that? Thanks!


Streaming write to orc problem

2022-04-22 Thread hsy...@gmail.com
Hello all,

I’m just trying to build a pipeline reading data from a streaming source
and write to orc file. But I don’t see any file that is written to the file
system nor any exceptions

Here is an example

val df = spark.readStream.format(“...")
  .option(
“Topic",
"Some topic"
  )
  .load()
val q = df.writeStream.format("orc").option("path", "gs://testdata/raw")
  .option("checkpointLocation",
"gs://testdata/raw_chk").trigger(Trigger.ProcessingTime(5,
TimeUnit.SECONDS)).start
q.awaitTermination(120)
q.stop()


I couldn’t find any file until 1200 seconds are over
Does it mean all the data is cached in memory. If I keep the pipeline
running I see no file would be flushed in the file system.

How do I control how often spark streaming write to disk?

Thanks!


I got weird error from a join

2018-02-21 Thread hsy...@gmail.com
from pyspark.sql import Row

A_DF = sc.parallelize(
[
Row(id='A123', name='a1'),
Row(id='A234', name='a2')
]).toDF()

B_DF = sc.parallelize(
[
Row(id='A123', pid='A234', ename='e1')
]).toDF()
join_df = B_DF.join(A_DF, B_DF.id==A_DF.id).drop(B_DF.id)
final_join = join_df.join(A_DF, join_df.pid==A_DF.id)
final_join.collect()

what I expect is A123, a1, A234, a2, e1

it works by just simply change one line  to


A_DF = sc.parallelize(
[
Row(id='A123', name='a1'),
Row(id='A234', name='a2')
]).toDF()

B_DF = sc.parallelize(
[
Row(id='A123', pid='A234', ename='e1')
]).toDF()
join_df = B_DF.join(A_DF, B_DF.id==A_DF.id).drop(B_DF.id)
final_join = A_DF.join(join_df, join_df.pid==A_DF.id)
final_join.collect()


Anyone has any idea why is that?


Questions about using pyspark 2.1.1 pushing data to kafka

2018-01-23 Thread hsy...@gmail.com
I have questions about using pyspark 2.1.1 pushing data to kafka.

I don't see any pyspark streaming api to write data directly to kafka, if
there is one or example, please point me to the right page.

I implemented my own way which using a global kafka producer and push the
data picked from foreach.  The problem is foreach is a single thread model,
so I can only do synced push, otherwise there is no way to know whether
data is pushed successfully.

Is there anyone try to do the same thing, any suggestions from you guys?

Thanks!

Regards,
Siyuan


Question about accumulator

2018-01-23 Thread hsy...@gmail.com
I have a small application like this

acc = sc.accumulate(5)

def t_f(x,):
global acc
sleep(5)
acc += x

def f(x):
global acc
thread = Thread(target = t_f, args = (x,))
thread.start()
# thread.join() # without this it doesn't work

rdd = sc.parallelize([1,2,4,1])
rdd.foreach(f)
sleep(30)
print(acc.value)
assert acc.value == 13

The code doesn't work unless I uncomment the thread.join

Any idea why?


Re: [EXTERNAL] kafka

2016-11-01 Thread hsy...@gmail.com
Hey Raja,

The setup for secure kafka input operator is not easy. You can follow these
steps.
1. Follow kafka document to setup your brokers properly (
https://kafka.apache.org/090/documentation.html#security_overview)
2. You have to manually create the client JAAS file (
https://kafka.apache.org/090/documentation.html#security_overview)
   a sample file would look like this:
 KafkaClient {

com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="kafka-clien...@example.com";
};

3. On the operator you have to set the attribute JVM_OPTS
 -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf

4. On the operator you have to set the consumerProperties, for example

set dt.operator.$your_operator_name.{consumerProps.security.protocol}  to
 SASL or other security function you use
set dt.operator.$your_operator_name.{consumerPrope.
sasl.kerberos.service.name} to kafka


Hope this would help you!


Regards,
Siyuan



On Sun, Oct 30, 2016 at 10:56 PM, Raja.Aravapalli <
raja.aravapa...@target.com> wrote:

>
>
> Hi Team,
>
>
>
> Can someone pls help me with below requested information ?
>
>
>
> Does apache apex have any inbuilt kafka input operator to read from Kafka
> 0.9 secure kafka topics?
>
>
>
> Thanks a lot.
>
>
>
> Regards,
>
> Raja.
>
>
>
> *From: *"Raja.Aravapalli" 
> *Reply-To: *"users@apex.apache.org" 
> *Date: *Monday, October 24, 2016 at 2:29 PM
> *To: *"users@apex.apache.org" 
> *Subject: *[EXTERNAL] kafka
>
>
>
>
>
> Hi,
>
>
>
> Do we have any kaka operator readily available to consume messages from
> secure kafka topics in kafka 0.9 clusters?
>
>
>
> Thanks a lot.
>
>
>
>
>
> Regards,
>
> Raja.
>


Re: Datatorrent fault tolerance

2016-10-07 Thread hsy...@gmail.com
Oh I see, you want to send to different topics. Well, then you have to give
some dummy value to the topic property on the operator.

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
wrote:

> Siyuan,
>
> So for the output operator, we have specified it as a part of our logic
> itself.
>
> public class KafkaSinglePortExactlyOnceOutputOperator extends 
> AbstractKafkaOutputOperator {
>
> private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
> public transient final DefaultInputPort in = new 
> DefaultInputPort() {
>
> Gson gson = new Gson();
>
> @Override
> public void process(Tenant tenant) {
>
> try {
> Producer<String, String> producer = getKafkaProducer();
> //ObjectMapper mapper = new ObjectMapper();
> long now = System.currentTimeMillis();
> //Configuration conf = HBaseConfiguration.create();
> //TenantDao dao = new TenantDao(conf);
> //ArrayList puts = new ArrayList<>();
> if (tenant != null) {
> //Tenant tenant = tenant.next();
> if (StringUtils.isNotEmpty(tenant.getGl())) {
> producer.send(new ProducerRecord<String, 
> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>  tenant.getVolumeName(), gson.toJson(tenant)));
> //puts.add(dao.mkPut(tenant));
> } else {
> producer.send(new ProducerRecord<String, 
> String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>
>         }
> producer.flush();
> }
> }
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:34 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:
>
>> Jaspal,
>>
>> I think you miss the kafkaOut  :)
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1...@gmail.com
>> > wrote:
>>
>>> Siyuan,
>>>
>>> That's how we have given it in properties file:
>>>
>>> [image: Inline image 1]
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com>
>>> wrote:
>>>
>>>> Jaspal,
>>>>
>>>> Topic is a mandatory property you have to set. In mapr, the value
>>>> should be set to the full stream path example:  
>>>> /your/stream/path:streamname
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> After making the change, we are getting the below error while
>>>>> application launch:
>>>>>
>>>>> *An error occurred trying to launch the application. Server message:
>>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>>> constraints
>>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>>> propertyPath='topic', message='may not be null', *
>>>>>
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>
>>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>>
>>>>>> Thanks for your inputs !!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote:
>>>>>>>
>>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in
>>>>&

Re: Datatorrent fault tolerance

2016-10-07 Thread hsy...@gmail.com
Jaspal,

I think you miss the kafkaOut  :)

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1...@gmail.com>
wrote:

> Siyuan,
>
> That's how we have given it in properties file:
>
> [image: Inline image 1]
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:27 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:
>
>> Jaspal,
>>
>> Topic is a mandatory property you have to set. In mapr, the value should
>> be set to the full stream path example:  /your/stream/path:streamname
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <jaspal.singh1...@gmail.com
>> > wrote:
>>
>>> After making the change, we are getting the below error while
>>> application launch:
>>>
>>> *An error occurred trying to launch the application. Server message:
>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>> constraints
>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>> propertyPath='topic', message='may not be null', *
>>>
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1...@gmail.com
>>> > wrote:
>>>
>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>
>>>> Thanks for your inputs !!
>>>>
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>> jaspal.singh1...@gmail.com> wrote:
>>>>
>>>>> Should we use malhar-library version 3.5 then ?
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <t...@apache.org> wrote:
>>>>>
>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>>>> This operator is not in malhar-library, it's a separate module.
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Siyuan,
>>>>>>>
>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy...@gmail.com <hsy...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Also which kafka output operator you are using?
>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Siyuan
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy...@gmail.com <hsy...@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hey Jaspal,
>>>>>>>>>
>>>>>>>>> Did you add any code to existing 
>>>>>>>>> KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>>>>> supported by MapR stream.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Siyuan
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>> jaspal.singh1...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thomas,
>>>>>>>>&

Re: Datatorrent fault tolerance

2016-10-07 Thread hsy...@gmail.com
Hey Jaspal,

Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from
malhar?  If so please make sure the producer you use here
is org.apache.kafka.clients.producer.KafkaProducer instead of
kafka.javaapi.producer.Producer.  That is old api and that is not supported
by MapR stream.


Regards,
Siyuan

On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh 
wrote:

> Thomas,
>
> Below is the operator implementation we are trying to run. This operator
> is getting an object of tenant class from updtream operator.
>
> public class KafkaSinglePortExactlyOnceOutputOperator extends 
> AbstractKafkaOutputOperator {
>
> private static final Logger LOG = 
> LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
> public transient final DefaultInputPort in = new 
> DefaultInputPort() {
>
> Gson gson = new Gson();
>
> @Override
> public void process(Tenant tenant) {
>
> try {
> Producer producer = getKafkaProducer();
> //ObjectMapper mapper = new ObjectMapper();
> long now = System.currentTimeMillis();
> //Configuration conf = HBaseConfiguration.create();
> //TenantDao dao = new TenantDao(conf);
> //ArrayList puts = new ArrayList<>();
> if (tenant != null) {
> //Tenant tenant = tenant.next();
> if (StringUtils.isNotEmpty(tenant.getGl())) {
> producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
>  tenant.getVolumeName(), gson.toJson(tenant)));
> //puts.add(dao.mkPut(tenant));
> } else {
> producer.send(new ProducerRecord String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
>  tenant.getVolumeName(), gson.toJson(tenant)));
>
> }
> producer.flush();
> }
> }
>
>
> After building the application, it throws error during launch:
>
> An error occurred trying to launch the application. Server message:
> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
> java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.
> privateGetDeclaredFields(Class.java:2583) at java.lang.Class.
> getDeclaredFields(Class.java:1916) at
>
>
> Thanks
> Jaspal
>
> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh 
> wrote:
>
>> Thomas,
>>
>> I was trying to refer to the input from previous operator.
>>
>> Another thing when we extend the AbstractKafkaOutputOperator, do we need
>> to specify  ? Since we are getting an object of class type from
>> previous operator.
>>
>>
>> Thanks
>> Jaspal
>>
>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise  wrote:
>>
>>> Are you referring to the upstream operator in the DAG or the state of
>>> the previous application after relaunch? Since the data is stored in MapR
>>> streams, an operator that is a producer can also act as a consumer. Please
>>> clarify your question.
>>>
>>>
>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh >> > wrote:
>>>
 Hi Thomas,

 I have a question, so when we are using
 *KafkaSinglePortExactlyOnceOutputOperator* to write results into
 maprstream topic will it be able to read messgaes from the previous
 operator ?


 Thanks
 Jaspal

 On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise  wrote:

> For recovery you need to set the window data manager like so:
>
> https://github.com/DataTorrent/examples/blob/master/tutorial
> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
> on.java#L33
>
> That will also apply to stateful restart of the entire application
> (relaunch from previous instance's checkpointed state).
>
> For cold restart, you would need to consider the property you mention
> and decide what is applicable to your use case.
>
> Thomas
>
>
> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
> jaspal.singh1...@gmail.com> wrote:
>
>> Ok now I get it. Thanks for the nice explaination !!
>>
>> One more thing, so you mentioned about checkpointing the offset
>> ranges to replay in same order from kafka.
>>
>> Is there any property we need to configure to do that? like
>> initialOffset set to APPLICATION_OR_LATEST.
>>
>>
>> Thanks
>> Jaspal
>>
>>
>> On Thursday, October 6, 2016, Thomas Weise 
>> wrote:
>>
>>> What you want is the effect of exactly-once output (that's why we
>>> call it also end-to-end exactly-once). There is no such thing as
>>> exactly-once processing in a distributed system. In 

Re: Apex and Malhar Java 8 Certified

2016-10-05 Thread hsy...@gmail.com
I think the problem is what people expect when we say "certified".  To me,
If I see something is certified with java 8, I would assume that I can use
java 8 api(new features stream, lambda etc.) to write the operator code,
not only just run the code with jre 8 or compile existing code with jdk 8
and run.

I did try some operator code with java 8 stream API and some lambda
expression in some methods it works. I havn't tried any operators with new
features in their non-transient properties. And also we should take a look
to see if kryo fully works with java 8 classes/types

Regards,
Siyuan



On Wed, Oct 5, 2016 at 9:34 AM, Munagala Ramanath 
wrote:

> You can use Java 8 but the source and target compatibility configuration
> parameters in
> your pom.xml for the maven-compiler-plugin still need to be 1.7
>
> Ram
>
> On Wed, Oct 5, 2016 at 9:14 AM, Feldkamp, Brandon (CONT) <
> brandon.feldk...@capitalone.com> wrote:
>
>> So is it safe to say that JDK 1.8 is supported to the same extent that
>> 1.7 is?
>>
>>
>>
>> We’re not running into any issues currently (that I know of…feel free to
>> chime back in Alex) but we’re making design decision and were curious about
>> being able to use Java 8 features.
>>
>>
>>
>> Thanks!
>>
>> Brandon
>>
>>
>>
>> *From: *Vlad Rozov 
>> *Organization: *DataTorrent
>> *Reply-To: *"users@apex.apache.org" 
>> *Date: *Monday, October 3, 2016 at 11:43 PM
>> *To: *"users@apex.apache.org" 
>> *Subject: *Re: Apex and Malhar Java 8 Certified
>>
>>
>>
>> We do test on Java 8 - both Apex Core and Malhar Apache Jenkins builds
>> use JDK 1.8 to run tests.
>>
>> Thank you,
>>
>> Vlad
>>
>> On 10/3/16 15:45, Thomas Weise wrote:
>>
>> Apex is built against Java 7 and expected to work as is on Java 8 (Hadoop
>> distros still support 1.7 as well). Are you running into specific issues?
>>
>>
>>
>> Thanks,
>>
>> Thomas
>>
>>
>>
>> On Mon, Oct 3, 2016 at 12:06 PM, McCullough, Alex <
>> alex.mccullo...@capitalone.com> wrote:
>>
>> Hey All,
>>
>>
>>
>> I know there were talks about this at some point but is Apex and/or
>> Malhar Java 8 certified? If not, is there a current plan and date to be so?
>>
>>
>>
>> Thanks,
>>
>> Alex
>>
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>>
>>
>>
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>
>


Re: Emit values from an array

2016-09-30 Thread hsy...@gmail.com
Is you pojo a shared object? I think you need to create new pojo every time.

Regards,
Siyuan

On Thu, Sep 29, 2016 at 3:03 PM, Jaikit Jilka  wrote:

> Hello,
>
> I am trying to emit values from an array. I am emitting using an for loop.
> Number of records emitted is correct but it is emitting only the last value
> of the array multiple times. So how to emit different values of array.
>
> int i =0;
> for (int j = 0;j
>  pojo.setID(id);
> pojo.seturl(url);
> pojo.setPosition(i);
>  pojo.setCares(cares[j]);
> i++;
> CaresOut.emit(pojo);
> }
>
>
> Thank you,
>
> Jaikit Jilka
>


Re: Malhar 0.8.1.1 Kafka Operator and chroot

2016-08-25 Thread hsy...@gmail.com
Hey Alex,

Does the workaround work? I just want to follow up to see my hypothesis for
the root cause is correct. Thanks!

Regards,
Siyuan

On Wed, Aug 24, 2016 at 10:56 AM, hsy...@gmail.com <hsy...@gmail.com> wrote:

> Hey Alex,
>
> Yeah, I think there is a bug for multitenant kafka support in the code. I
> have created a ticket
> https://issues.apache.org/jira/browse/APEXMALHAR-2199
>
> For now can you try one thing:
> Can you try to set your zookeeper to something like this:
>
> 
>
>dt.operator.kafkaInputOperator.prop.consumer.zookeeper
>
>10.##.##.#:2181/kafka2,10.##.##.##:2181/kafka2,10.##.
> ##.##:2181/kafka2,10.##.##.#:2181/kafka2
>
> 
>
>
> or you can just try to set just one of the zookeeper nodes.
>
> For kafka client it only needs to know one running node but you'll lose
> zookeeper HA
>
>
> Regards,
>
> Siyuan
>
> On Wed, Aug 24, 2016 at 10:40 AM, McCullough, Alex <
> alex.mccullo...@capitalone.com> wrote:
>
>> ONE_TO_ONE
>>
>>
>>
>>
>>
>>
>>
>> *From: *"hsy...@gmail.com" <hsy...@gmail.com>
>> *Reply-To: *"users@apex.apache.org" <users@apex.apache.org>
>> *Date: *Wednesday, August 24, 2016 at 1:38 PM
>>
>> *To: *"users@apex.apache.org" <users@apex.apache.org>
>> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>>
>>
>>
>> Hey Alex,
>>
>>
>>
>> Do you use ONE_TO_ONE or ONE_TO_MANY partition?
>>
>>
>>
>> Regards,
>>
>> Siyuan
>>
>>
>>
>> On Wed, Aug 24, 2016 at 10:27 AM, McCullough, Alex <
>> alex.mccullo...@capitalone.com> wrote:
>>
>> Hey Siyuan,
>>
>>
>>
>> We are using 3.4.0
>>
>>
>>
>> Thanks,
>>
>> Alex
>>
>> *From: *"hsy...@gmail.com" <hsy...@gmail.com>
>> *Reply-To: *"users@apex.apache.org" <users@apex.apache.org>
>> *Date: *Wednesday, August 24, 2016 at 12:47 PM
>> *To: *"users@apex.apache.org" <users@apex.apache.org>
>> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>>
>>
>>
>> Hey McCullough,
>>
>>
>>
>> What malhar version do you use?
>>
>>
>>
>> Regards,
>>
>> Siyuan
>>
>>
>>
>> On Wed, Aug 24, 2016 at 9:07 AM, McCullough, Alex <
>> alex.mccullo...@capitalone.com> wrote:
>>
>> Hey All,
>>
>>
>>
>> We are using the 0.8.1 kafka operator and the ZK connection string has a
>> chroot on it. We get errors when launching and the app fails, is there a
>> proper way in apex to append a chroot?
>>
>>
>>
>>
>>
>> **the ip’s are masked with #, but that’s not how they appear in our code
>> obviously**
>>
>>
>>
>> When we add this to the property for ZK:
>>
>>
>>
>>
>>
>> 
>>
>>dt.operator.kafkaInputOperator.prop.consumer.
>> zookeeper
>>
>>10.##.##.#:2181,10.##.##.##:2181,10.##.##.##:2181,10.
>> ##.##.#:2181/kafka2
>>
>> 
>>
>>
>>
>>
>>
>>
>>
>> We get this error (connecting to a cluster without chroot it works fine):
>>
>>
>>
>>
>>
>> 2016-08-24 11:55:13,448 [main-EventThread] INFO  zkclient.ZkClient
>> processStateChanged - zookeeper state changed (SyncConnected)
>>
>> 2016-08-24 11:55:13,585 [ZkClient-EventThread-30-10.##
>> .##.#:2181,10.##.##.##:2181,10.##.##.#:2181/kafka2,10.##.##.##:2181]
>> INFO  zkclient.ZkEventThread run - Terminate ZkClient event thread.
>>
>> 2016-08-24 11:55:13,596 [main] INFO  zookeeper.ZooKeeper close - Session:
>> 0x4558654aacf4263 closed
>>
>> 2016-08-24 11:55:13,596 [main-EventThread] INFO  zookeeper.ClientCnxn run
>> - EventThread shut down
>>
>> 2016-08-24 11:55:13,597 [main] INFO  kafka.AbstractKafkaInputOperator
>> definePartitions - [ONE_TO_ONE]: Initializing partition(s)
>>
>> 2016-08-24 11:55:13,602 [main] INFO  service.AbstractService noteFailure
>> - Service com.datatorrent.stram.StreamingAppMasterService failed in
>> state INITED; cause: java.lang.IllegalArgumentException: there has to be
>> one idempotent storage manager
>>
>> java.lang.IllegalArgumentException: there has to be one idempotent
>> storage manager
>>
>> at com.google.common.base.Preconditions.checkArgument(Precondit
>> ions.java:93)
>>
>> at org.apache.apex.malhar.lib.

Re: Malhar 0.8.1.1 Kafka Operator and chroot

2016-08-24 Thread hsy...@gmail.com
Hey Alex,

Do you use ONE_TO_ONE or ONE_TO_MANY partition?

Regards,
Siyuan

On Wed, Aug 24, 2016 at 10:27 AM, McCullough, Alex <
alex.mccullo...@capitalone.com> wrote:

> Hey Siyuan,
>
>
>
> We are using 3.4.0
>
>
>
> Thanks,
>
> Alex
>
> *From: *"hsy...@gmail.com" <hsy...@gmail.com>
> *Reply-To: *"users@apex.apache.org" <users@apex.apache.org>
> *Date: *Wednesday, August 24, 2016 at 12:47 PM
> *To: *"users@apex.apache.org" <users@apex.apache.org>
> *Subject: *Re: Malhar 0.8.1.1 Kafka Operator and chroot
>
>
>
> Hey McCullough,
>
>
>
> What malhar version do you use?
>
>
>
> Regards,
>
> Siyuan
>
>
>
> On Wed, Aug 24, 2016 at 9:07 AM, McCullough, Alex <
> alex.mccullo...@capitalone.com> wrote:
>
> Hey All,
>
>
>
> We are using the 0.8.1 kafka operator and the ZK connection string has a
> chroot on it. We get errors when launching and the app fails, is there a
> proper way in apex to append a chroot?
>
>
>
>
>
> **the ip’s are masked with #, but that’s not how they appear in our code
> obviously**
>
>
>
> When we add this to the property for ZK:
>
>
>
>
>
> 
>
>dt.operator.kafkaInputOperator.prop.consumer.zookeeper
>
>10.##.##.#:2181,10.##.##.##:2181,10.##.##.##:2181,10.
> ##.##.#:2181/kafka2
>
> 
>
>
>
>
>
>
>
> We get this error (connecting to a cluster without chroot it works fine):
>
>
>
>
>
> 2016-08-24 11:55:13,448 [main-EventThread] INFO  zkclient.ZkClient
> processStateChanged - zookeeper state changed (SyncConnected)
>
> 2016-08-24 11:55:13,585 [ZkClient-EventThread-30-10.##
> .##.#:2181,10.##.##.##:2181,10.##.##.#:2181/kafka2,10.##.##.##:2181]
> INFO  zkclient.ZkEventThread run - Terminate ZkClient event thread.
>
> 2016-08-24 11:55:13,596 [main] INFO  zookeeper.ZooKeeper close - Session:
> 0x4558654aacf4263 closed
>
> 2016-08-24 11:55:13,596 [main-EventThread] INFO  zookeeper.ClientCnxn run
> - EventThread shut down
>
> 2016-08-24 11:55:13,597 [main] INFO  kafka.AbstractKafkaInputOperator
> definePartitions - [ONE_TO_ONE]: Initializing partition(s)
>
> 2016-08-24 11:55:13,602 [main] INFO  service.AbstractService noteFailure -
> Service com.datatorrent.stram.StreamingAppMasterService failed in state
> INITED; cause: java.lang.IllegalArgumentException: there has to be one
> idempotent storage manager
>
> java.lang.IllegalArgumentException: there has to be one idempotent
> storage manager
>
> at com.google.common.base.Preconditions.checkArgument(
> Preconditions.java:93)
>
> at org.apache.apex.malhar.lib.wal.FSWindowDataManager.partitioned(
> FSWindowDataManager.java:251)
>
> at com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.
> definePartitions(AbstractKafkaInputOperator.java:637)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> initPartitioning(PhysicalPlan.java:752)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.
> addLogicalOperator(PhysicalPlan.java:1676)
>
> at com.datatorrent.stram.plan.physical.PhysicalPlan.(
> PhysicalPlan.java:378)
>
> at com.datatorrent.stram.StreamingContainerManager.(
> StreamingContainerManager.java:418)
>
> at com.datatorrent.stram.StreamingContainerManager.getInstance(
> StreamingContainerManager.java:3023)
>
> at com.datatorrent.stram.StreamingAppMasterService.serviceInit(
> StreamingAppMasterService.java:551)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:163)
>
> at com.datatorrent.stram.StreamingAppMaster.main(
> StreamingAppMaster.java:102)
>
> 2016-08-24 11:55:13,604 [main] WARN  service.AbstractService stopQuietly -
> When stopping the service com.datatorrent.stram.StreamingAppMasterService
> : java.lang.NullPointerException
>
> java.lang.NullPointerException
>
> at com.datatorrent.stram.StreamingAppMasterService.serviceStop(
> StreamingAppMasterService.java:629)
>
> at org.apache.hadoop.service.AbstractService.stop(
> AbstractService.java:221)
>
> at org.apache.hadoop.service.ServiceOperations.stop(
> ServiceOperations.java:52)
>
> at org.apache.hadoop.service.ServiceOperations.stopQuietly(
> ServiceOperations.java:80)
>
> at org.apache.hadoop.service.AbstractService.init(
> AbstractService.java:171)
>
> at com.datatorrent.stram.StreamingAppMaster.main(
> StreamingAppMaster.java:102)
>
> 2016-08-24 11:55:13,605 [main] ERROR stram.StreamingAppMaster main -
> Exiting Application Master
>
> java.lang.IllegalArgumentException: there has to be on

Re: can operators emit on a different from the operator itself thread?

2016-08-10 Thread hsy...@gmail.com
Hey Vlad,

Thanks for bringing this up. Is there an easy way to detect unexpected use
of emit method without hurt the performance. Or at least if we can detect
this in debug mode.

Regards,
Siyuan

On Wed, Aug 10, 2016 at 11:27 AM, Vlad Rozov 
wrote:

> The short answer is no, creating worker thread to emit tuples is not
> supported by Apex and will lead to an undefined behavior. Operators in Apex
> have strong thread affinity and all interaction with the platform must
> happen on the operator thread.
>
> Vlad
>


Re: Force Fail Application

2016-07-18 Thread hsy...@gmail.com
He Michael,

You can throw a ShutdownException.

Siyuan

On Mon, Jul 18, 2016 at 10:06 AM, Silver, Michael <
michael.sil...@capitalone.com> wrote:

>
>
>
>
> Hello,
>
>
>
> I am looking for a solution to force shutdown or fail my application. I
> have an operator that checks that a file (which is needed for the
> application to run) is present during setup. If the file is not present I
> want the entire application to fail. How would I do this in apex?
>
>
>
> Thank you,
>
>
>
> Michael
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: Regarding using Scala to develop Apex app.

2016-07-15 Thread hsy...@gmail.com
BTW Akshay, if you are using anonymous function as a field in the operator,
it's very likely that your function is stateless?  If that's the case, you
can try to mark it transient (
http://www.scala-lang.org/api/rc2/scala/transient.html)


On Thu, Jul 14, 2016 at 11:06 PM, Akshay S Harale <
akshay.har...@synerzip.com> wrote:

> Hello,
>
> I found one blog post on writing apex app in scala
> .
> First I tried simple app it worked very well but when I introduced some
> anonymous functions in program, it started throwing kryo serialisation
> exception:
> *com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
> no-arg constructor): com.sample.Aggregator$$anon$1*
>
> My question is : Will Apex have full support for scala in future ?
>
> Regards,
> Akshay S. Harale
> Software Developer @ Synerzip
> Skype – akshayharale
>
> This e-mail, including any attached files, may contain confidential and
> privileged information for the sole use of the intended recipient. Any
> review, use, distribution, or disclosure by others is strictly prohibited.
> If you are not the intended recipient (or authorized to receive information
> for the intended recipient), please contact the sender by reply e-mail and
> delete all copies of this message.
>
>


Re: A proposal for Malhar

2016-07-12 Thread hsy...@gmail.com
Why not have a shared google sheet with a list of operators and options
that we want to do with it.
I think it's case by case.
But retire unused or obsolete operators is important and we should do it
sooner rather than later.

Regards,
Siyuan

On Tue, Jul 12, 2016 at 1:09 PM, Amol Kekre <a...@datatorrent.com> wrote:

>
> My vote is to do 2&3
>
> Thks
> Amol
>
>
> On Tue, Jul 12, 2016 at 12:14 PM, Kottapalli, Venkatesh <
> vkottapa...@directv.com> wrote:
>
>> +1 for deprecating the packages listed below.
>>
>> -Original Message-
>> From: hsy...@gmail.com [mailto:hsy...@gmail.com]
>> Sent: Tuesday, July 12, 2016 12:01 PM
>>
>> +1
>>
>> On Tue, Jul 12, 2016 at 11:53 AM, David Yan <da...@datatorrent.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I would like to renew the discussion of retiring operators in Malhar.
>> >
>> > As stated before, the reason why we would like to retire operators in
>> > Malhar is because some of them were written a long time ago before
>> > Apache incubation, and they do not pertain to real use cases, are not
>> > up to par in code quality, have no potential for improvement, and
>> > probably completely unused by anybody.
>> >
>> > We do not want contributors to use them as a model of their
>> > contribution, or users to use them thinking they are of quality, and
>> then hit a wall.
>> > Both scenarios are not beneficial to the reputation of Apex.
>> >
>> > The initial 3 packages that we would like to target are *lib/algo*,
>> > *lib/math*, and *lib/streamquery*.
>>
>> >
>> > I'm adding this thread to the users list. Please speak up if you are
>> > using any operator in these 3 packages. We would like to hear from you.
>> >
>> > These are the options I can think of for retiring those operators:
>> >
>> > 1) Completely remove them from the malhar repository.
>> > 2) Move them from malhar-library into a separate artifact called
>> > malhar-misc
>> > 3) Mark them deprecated and add to their javadoc that they are no
>> > longer supported
>> >
>> > Note that 2 and 3 are not mutually exclusive. Any thoughts?
>> >
>> > David
>> >
>> > On Tue, Jun 7, 2016 at 2:27 PM, Pramod Immaneni
>> > <pra...@datatorrent.com>
>> > wrote:
>> >
>> >> I wanted to close the loop on this discussion. In general everyone
>> >> seemed to be favorable to this idea with no serious objections. Folks
>> >> had good suggestions like documenting capabilities of operators, come
>> >> up well defined criteria for graduation of operators and what those
>> >> criteria may be and what to do with existing operators that may not
>> >> yet be mature or unused.
>> >>
>> >> I am going to summarize the key points that resulted from the
>> >> discussion and would like to proceed with them.
>> >>
>> >>- Operators that do not yet provide the key platform capabilities to
>> >>make an operator useful across different applications such as
>> >> reusability,
>> >>partitioning static or dynamic, idempotency, exactly once will
>> still be
>> >>accepted as long as they are functionally correct, have unit tests
>> >> and will
>> >>go into a separate module.
>> >>- Contrib module was suggested as a place where new contributions
>> go in
>> >>that don't yet have all the platform capabilities and are not yet
>> >> mature.
>> >>If there are no other suggestions we will go with this one.
>> >>- It was suggested the operators documentation list those platform
>> >>capabilities it currently provides from the list above. I will
>> >> document a
>> >>structure for this in the contribution guidelines.
>> >>- Folks wanted to know what would be the criteria to graduate an
>> >>operator to the big leagues :). I will kick-off a separate thread
>> >> for it as
>> >>I think it requires its own discussion and hopefully we can come
>> >> up with a
>> >>set of guidelines for it.
>> >>- David brought up state of some of the existing operators and their
>> >>retirement and the layout of operators in Malhar in general and how
>> it
>> >>causes problems with development. I will ask him to lead the
>> >> d

Re: A proposal for Malhar

2016-07-12 Thread hsy...@gmail.com
Why not have a shared google sheet with a list of operators and options
that we want to do with it.
I think it's case by case.
But retire unused or obsolete operators is important and we should do it
sooner rather than later.

Regards,
Siyuan

On Tue, Jul 12, 2016 at 1:09 PM, Amol Kekre <a...@datatorrent.com> wrote:

>
> My vote is to do 2&3
>
> Thks
> Amol
>
>
> On Tue, Jul 12, 2016 at 12:14 PM, Kottapalli, Venkatesh <
> vkottapa...@directv.com> wrote:
>
>> +1 for deprecating the packages listed below.
>>
>> -Original Message-
>> From: hsy...@gmail.com [mailto:hsy...@gmail.com]
>> Sent: Tuesday, July 12, 2016 12:01 PM
>>
>> +1
>>
>> On Tue, Jul 12, 2016 at 11:53 AM, David Yan <da...@datatorrent.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I would like to renew the discussion of retiring operators in Malhar.
>> >
>> > As stated before, the reason why we would like to retire operators in
>> > Malhar is because some of them were written a long time ago before
>> > Apache incubation, and they do not pertain to real use cases, are not
>> > up to par in code quality, have no potential for improvement, and
>> > probably completely unused by anybody.
>> >
>> > We do not want contributors to use them as a model of their
>> > contribution, or users to use them thinking they are of quality, and
>> then hit a wall.
>> > Both scenarios are not beneficial to the reputation of Apex.
>> >
>> > The initial 3 packages that we would like to target are *lib/algo*,
>> > *lib/math*, and *lib/streamquery*.
>>
>> >
>> > I'm adding this thread to the users list. Please speak up if you are
>> > using any operator in these 3 packages. We would like to hear from you.
>> >
>> > These are the options I can think of for retiring those operators:
>> >
>> > 1) Completely remove them from the malhar repository.
>> > 2) Move them from malhar-library into a separate artifact called
>> > malhar-misc
>> > 3) Mark them deprecated and add to their javadoc that they are no
>> > longer supported
>> >
>> > Note that 2 and 3 are not mutually exclusive. Any thoughts?
>> >
>> > David
>> >
>> > On Tue, Jun 7, 2016 at 2:27 PM, Pramod Immaneni
>> > <pra...@datatorrent.com>
>> > wrote:
>> >
>> >> I wanted to close the loop on this discussion. In general everyone
>> >> seemed to be favorable to this idea with no serious objections. Folks
>> >> had good suggestions like documenting capabilities of operators, come
>> >> up well defined criteria for graduation of operators and what those
>> >> criteria may be and what to do with existing operators that may not
>> >> yet be mature or unused.
>> >>
>> >> I am going to summarize the key points that resulted from the
>> >> discussion and would like to proceed with them.
>> >>
>> >>- Operators that do not yet provide the key platform capabilities to
>> >>make an operator useful across different applications such as
>> >> reusability,
>> >>partitioning static or dynamic, idempotency, exactly once will
>> still be
>> >>accepted as long as they are functionally correct, have unit tests
>> >> and will
>> >>go into a separate module.
>> >>- Contrib module was suggested as a place where new contributions
>> go in
>> >>that don't yet have all the platform capabilities and are not yet
>> >> mature.
>> >>If there are no other suggestions we will go with this one.
>> >>- It was suggested the operators documentation list those platform
>> >>capabilities it currently provides from the list above. I will
>> >> document a
>> >>structure for this in the contribution guidelines.
>> >>- Folks wanted to know what would be the criteria to graduate an
>> >>operator to the big leagues :). I will kick-off a separate thread
>> >> for it as
>> >>I think it requires its own discussion and hopefully we can come
>> >> up with a
>> >>set of guidelines for it.
>> >>- David brought up state of some of the existing operators and their
>> >>retirement and the layout of operators in Malhar in general and how
>> it
>> >>causes problems with development. I will ask him to lead the
>> >> d

Re: A proposal for Malhar

2016-07-12 Thread hsy...@gmail.com
+1

On Tue, Jul 12, 2016 at 11:53 AM, David Yan  wrote:

> Hi all,
>
> I would like to renew the discussion of retiring operators in Malhar.
>
> As stated before, the reason why we would like to retire operators in
> Malhar is because some of them were written a long time ago before Apache
> incubation, and they do not pertain to real use cases, are not up to par in
> code quality, have no potential for improvement, and probably completely
> unused by anybody.
>
> We do not want contributors to use them as a model of their contribution,
> or users to use them thinking they are of quality, and then hit a wall.
> Both scenarios are not beneficial to the reputation of Apex.
>
> The initial 3 packages that we would like to target are *lib/algo*,
> *lib/math*, and *lib/streamquery*.
>
> I'm adding this thread to the users list. Please speak up if you are using
> any operator in these 3 packages. We would like to hear from you.
>
> These are the options I can think of for retiring those operators:
>
> 1) Completely remove them from the malhar repository.
> 2) Move them from malhar-library into a separate artifact called
> malhar-misc
> 3) Mark them deprecated and add to their javadoc that they are no longer
> supported
>
> Note that 2 and 3 are not mutually exclusive. Any thoughts?
>
> David
>
> On Tue, Jun 7, 2016 at 2:27 PM, Pramod Immaneni 
> wrote:
>
>> I wanted to close the loop on this discussion. In general everyone seemed
>> to be favorable to this idea with no serious objections. Folks had good
>> suggestions like documenting capabilities of operators, come up well
>> defined criteria for graduation of operators and what those criteria may
>> be
>> and what to do with existing operators that may not yet be mature or
>> unused.
>>
>> I am going to summarize the key points that resulted from the discussion
>> and would like to proceed with them.
>>
>>- Operators that do not yet provide the key platform capabilities to
>>make an operator useful across different applications such as
>> reusability,
>>partitioning static or dynamic, idempotency, exactly once will still be
>>accepted as long as they are functionally correct, have unit tests and
>> will
>>go into a separate module.
>>- Contrib module was suggested as a place where new contributions go in
>>that don't yet have all the platform capabilities and are not yet
>> mature.
>>If there are no other suggestions we will go with this one.
>>- It was suggested the operators documentation list those platform
>>capabilities it currently provides from the list above. I will
>> document a
>>structure for this in the contribution guidelines.
>>- Folks wanted to know what would be the criteria to graduate an
>>operator to the big leagues :). I will kick-off a separate thread for
>> it as
>>I think it requires its own discussion and hopefully we can come up
>> with a
>>set of guidelines for it.
>>- David brought up state of some of the existing operators and their
>>retirement and the layout of operators in Malhar in general and how it
>>causes problems with development. I will ask him to lead the
>> discussion on
>>that.
>>
>> Thanks
>>
>> On Fri, May 27, 2016 at 7:47 PM, David Yan  wrote:
>>
>> > The two ideas are not conflicting, but rather complementing.
>> >
>> > On the contrary, putting a new process for people trying to contribute
>> > while NOT addressing the old unused subpar operators in the repository
>> is
>> > what is conflicting.
>> >
>> > Keep in mind that when people try to contribute, they always look at the
>> > existing operators already in the repository as examples and likely a
>> model
>> > for their new operators.
>> >
>> > David
>> >
>> >
>> > On Fri, May 27, 2016 at 4:05 PM, Amol Kekre 
>> wrote:
>> >
>> > > Yes there are two conflicting threads now. The original thread was to
>> > open
>> > > up a way for contributors to submit code in a dir (contrib?) as long
>> as
>> > > license part of taken care of.
>> > >
>> > > On the thread of removing non-used operators -> How do we know what is
>> > > being used?
>> > >
>> > > Thks,
>> > > Amol
>> > >
>> > >
>> > > On Fri, May 27, 2016 at 3:40 PM, Sandesh Hegde <
>> sand...@datatorrent.com>
>> > > wrote:
>> > >
>> > > > +1 for removing the not-used operators.
>> > > >
>> > > > So we are creating a process for operator writers who don't want to
>> > > > understand the platform, yet wants to contribute? How big is that
>> set?
>> > > > If we tell the app-user, here is the code which has not passed all
>> the
>> > > > checklist, will they be ready to use that in production?
>> > > >
>> > > > This thread has 2 conflicting forces, reduce the operators and make
>> it
>> > > easy
>> > > > to add more operators.
>> > > >
>> > > >
>> > > >
>> > > > On Fri, May 27, 2016 at 3:03 PM Pramod Immaneni <
>> > pra...@datatorrent.com>
>> > > > 

Re: A proposal for Malhar

2016-07-12 Thread hsy...@gmail.com
+1

On Tue, Jul 12, 2016 at 11:53 AM, David Yan  wrote:

> Hi all,
>
> I would like to renew the discussion of retiring operators in Malhar.
>
> As stated before, the reason why we would like to retire operators in
> Malhar is because some of them were written a long time ago before Apache
> incubation, and they do not pertain to real use cases, are not up to par in
> code quality, have no potential for improvement, and probably completely
> unused by anybody.
>
> We do not want contributors to use them as a model of their contribution,
> or users to use them thinking they are of quality, and then hit a wall.
> Both scenarios are not beneficial to the reputation of Apex.
>
> The initial 3 packages that we would like to target are *lib/algo*,
> *lib/math*, and *lib/streamquery*.
>
> I'm adding this thread to the users list. Please speak up if you are using
> any operator in these 3 packages. We would like to hear from you.
>
> These are the options I can think of for retiring those operators:
>
> 1) Completely remove them from the malhar repository.
> 2) Move them from malhar-library into a separate artifact called
> malhar-misc
> 3) Mark them deprecated and add to their javadoc that they are no longer
> supported
>
> Note that 2 and 3 are not mutually exclusive. Any thoughts?
>
> David
>
> On Tue, Jun 7, 2016 at 2:27 PM, Pramod Immaneni 
> wrote:
>
>> I wanted to close the loop on this discussion. In general everyone seemed
>> to be favorable to this idea with no serious objections. Folks had good
>> suggestions like documenting capabilities of operators, come up well
>> defined criteria for graduation of operators and what those criteria may
>> be
>> and what to do with existing operators that may not yet be mature or
>> unused.
>>
>> I am going to summarize the key points that resulted from the discussion
>> and would like to proceed with them.
>>
>>- Operators that do not yet provide the key platform capabilities to
>>make an operator useful across different applications such as
>> reusability,
>>partitioning static or dynamic, idempotency, exactly once will still be
>>accepted as long as they are functionally correct, have unit tests and
>> will
>>go into a separate module.
>>- Contrib module was suggested as a place where new contributions go in
>>that don't yet have all the platform capabilities and are not yet
>> mature.
>>If there are no other suggestions we will go with this one.
>>- It was suggested the operators documentation list those platform
>>capabilities it currently provides from the list above. I will
>> document a
>>structure for this in the contribution guidelines.
>>- Folks wanted to know what would be the criteria to graduate an
>>operator to the big leagues :). I will kick-off a separate thread for
>> it as
>>I think it requires its own discussion and hopefully we can come up
>> with a
>>set of guidelines for it.
>>- David brought up state of some of the existing operators and their
>>retirement and the layout of operators in Malhar in general and how it
>>causes problems with development. I will ask him to lead the
>> discussion on
>>that.
>>
>> Thanks
>>
>> On Fri, May 27, 2016 at 7:47 PM, David Yan  wrote:
>>
>> > The two ideas are not conflicting, but rather complementing.
>> >
>> > On the contrary, putting a new process for people trying to contribute
>> > while NOT addressing the old unused subpar operators in the repository
>> is
>> > what is conflicting.
>> >
>> > Keep in mind that when people try to contribute, they always look at the
>> > existing operators already in the repository as examples and likely a
>> model
>> > for their new operators.
>> >
>> > David
>> >
>> >
>> > On Fri, May 27, 2016 at 4:05 PM, Amol Kekre 
>> wrote:
>> >
>> > > Yes there are two conflicting threads now. The original thread was to
>> > open
>> > > up a way for contributors to submit code in a dir (contrib?) as long
>> as
>> > > license part of taken care of.
>> > >
>> > > On the thread of removing non-used operators -> How do we know what is
>> > > being used?
>> > >
>> > > Thks,
>> > > Amol
>> > >
>> > >
>> > > On Fri, May 27, 2016 at 3:40 PM, Sandesh Hegde <
>> sand...@datatorrent.com>
>> > > wrote:
>> > >
>> > > > +1 for removing the not-used operators.
>> > > >
>> > > > So we are creating a process for operator writers who don't want to
>> > > > understand the platform, yet wants to contribute? How big is that
>> set?
>> > > > If we tell the app-user, here is the code which has not passed all
>> the
>> > > > checklist, will they be ready to use that in production?
>> > > >
>> > > > This thread has 2 conflicting forces, reduce the operators and make
>> it
>> > > easy
>> > > > to add more operators.
>> > > >
>> > > >
>> > > >
>> > > > On Fri, May 27, 2016 at 3:03 PM Pramod Immaneni <
>> > pra...@datatorrent.com>
>> > > > 

Re: DataTorrent with SBT: .apa file not created

2016-07-11 Thread hsy...@gmail.com
I've never used SBT to build Apex application. But I guess you can try 2
things here
Use the sbt maven plugin
https://github.com/shivawu/sbt-maven-plugin
or use sbt assembly plugin
https://github.com/sbt/sbt-assembly

In the 2nd way, you need to translate the plugin configuration part in
pom.xml to sbt scripts.
The configuration usually look like this

I wish this helps


  maven-assembly-plugin
  

  app-package-assembly
  package
  
single
  
  
${project.artifactId}-${project.version}-apexapp
false

  src/assemble/appPackage.xml


  0755


  
${apex.apppackage.classpath}
${apex.core.version}

${apex.apppackage.groupid}
${project.artifactId}
${project.version}

${project.name}

${project.description}
  

  

  



On Mon, Jul 11, 2016 at 2:01 PM, Ankit Sarraf 
wrote:

> I am using SBT to create a DataTorrent Application. The project comprises
> of 2 parts. Part 1 is a Random Kafka Generator built using Scala. Part 2 is
> the DataTorrent Application (Java) to ingest data, process it, and write to
> HDFS.
>
> There are no errors while doing sbt assembly.
>
> Although, Uber JAR is created successfully, .apa file is not created. So
> does DataTorrent work with SBT?
>
> Thanks
> Ankit.
>


How do I turn off INFO log for embedded kafka server

2016-07-08 Thread hsy...@gmail.com
Hey guys,

I have some unit test that has an embedded kafka server running. I want to
skip all debug and info logs from kafka server. But having this set in
log4j.properties does work. Some INFO log still keep showing up like this:

2016-07-08 18:01:14,288 [kafka-request-handler-4] INFO
cluster.Partition info - Partition [__consumer_offsets,4] on broker 0:
No checkpointed highwatermark is found for partition
[__consumer_offsets,4]
2016-07-08 18:01:14,286 [kafka-request-handler-2] INFO
cluster.Partition info - Partition [__consumer_offsets,39] on broker
100: No checkpointed highwatermark is found for partition
[__consumer_offsets,39]
2016-07-08 18:01:14,296 [kafka-request-handler-7] INFO  log.Log info -
Completed load of log __consumer_offsets-33 with log end offset 0
2016-07-08 18:01:14,310 [kafka-request-handler-2] INFO  log.Log info -
Completed load of log __consumer_offsets-4 with log end offset 0

Here is my setting:
log4j.logger.org.apache.kafka=WARN
log4j.logger.kafka=WARN
log4j.logger.org.apache.zookeeper=WARN


Zookeeper is fine, some of the logs from kafka brokers are also
skipped, but some of the logs are still showing up


Thanks,

Siyuan


log4j setting for embedded kafka server

2016-06-24 Thread hsy...@gmail.com
Hi guys,

I start server grammatically in my application using
KafkaStatableServer.startup() method. And in the log4j.properties setting.
I add this

log4j.logger.org.apacke.kafka=WARN
log4j.logger.kafka=WARN

But I always get INFO log, Do you guys know how to enforce the log level
here? Thanks!


Re: kafka offset commit

2016-06-06 Thread hsy...@gmail.com
Raja,

Not exactly, Apex actually stores offsets as part of the operator state,
And state of the operator are checkpointed internally and periodically( in
HDFS by default). For more details, you can read this
https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/

With that said, offsets are stored in HDFS along with other state of the
operator so that it can recover in case of any system failure.
And also in Apex, you can do stateful restart (start the application by
specifying the previous application id). It will initialize all operators
and load the checkpointed state (offsets will be part of it) from HDFS and
continue run from that state.  The only limit is, you can not easy tell
where the current offsets are.  Hope this answered your question.

Regards,
Siyuan


On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli <raja.aravapa...@target.com>
wrote:

>
> Thanks Siyuan.
>
> So, to confirm, to apex is not storing offsets status at any location ?
> Like how Storm stores in Zookeeper ?
>
>
> Regards,
> Raja.
>
> From: "hsy...@gmail.com" <hsy...@gmail.com>
> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
> Date: Monday, June 6, 2016 at 6:42 PM
>
> To: "users@apex.apache.org" <users@apex.apache.org>
> Subject: Re: kafka offset commit
>
> Hey Raja,
>
> For 0.8, you have to implement OffsetManager interface on your own. The
> updateOffsets will be called in application master every time when it get
> updated offsets from each physical partition. And the offsets that you see
> in the method is committed offset. So you can safely save these offsets
> into either zookeeper(0.8.2 client has API to do that) or any other
> datastore like DB or HDFS.  And also you have to implement the method
> loadInitialOffsets to load back offset you want.
>
> You are welcome to contribute a default implementation using buildin kafka
> offset commit request API for OffsetManager!
>
> Regards,
> Siyuan
>
> On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <
> raja.aravapa...@target.com> wrote:
>
>>
>> Hi Thomas,
>>
>> We are using 0.8 cluster still!!
>>
>>
>> Regards,
>> Raja.
>>
>> From: Thomas Weise <thomas.we...@gmail.com>
>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>> Date: Monday, June 6, 2016 at 5:23 PM
>> To: "users@apex.apache.org" <users@apex.apache.org>
>> Subject: Re: kafka offset commit
>>
>> Hi Raja,
>>
>> Which Kafka version are you using?
>>
>> With the new 0.9 connector there is no need for the offset manager:
>>
>>
>> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka
>>
>> Thanks,
>> Thomas
>>
>>
>> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <
>> raja.aravapa...@target.com> wrote:
>>
>>> Hi
>>>
>>> Can someone please help me understand, where will the offsets be stored
>>> when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>>>
>>> And, how to handle restarts ?
>>>
>>>
>>> I worked with Storm earlier, Storm maintains the offsets in zookeeper
>>> and client id is maintained for every consumer, using which
>>>
>>> - we can see what is the current offset status for a given partition &
>>> modify them as well using zookeeper-cli !!
>>> - restarts can be handled
>>>
>>>
>>> As per the Apex documentation, I can see, that using OffsetManager we
>>> can handle the restarts effectively, but couldn’t find any examples to
>>> refer…
>>>
>>> How clientId can be used to retrieve offsets status
>>> And ability to edit the offsets etc
>>>
>>> can someone pls help me find this ?
>>>
>>>
>>> Thanks a lot!!
>>>
>>>
>>> -Regards,
>>> Raja.
>>>
>>>
>>>
>>>
>>
>


Log level for consumer properties

2016-05-04 Thread hsy...@gmail.com
Hi,

Right now, when we initialize kafka consumer, it always log the consumer
properties at INFO level, can we put it into DEBUG level? I have to
periodically create consumer instance to just pull some metadata of some
topic and I don't want to get this noisy log.

Regards,
Siyuan


Re: javax.persistence.Persistence classNotFoundError

2016-05-02 Thread hsy...@gmail.com
Hey Ananth,

Can you unzip your apa file to see if it is in the lib folder? If it is not
there, it means the packaging is not correct.

Regards,
Siyuan

On Mon, May 2, 2016 at 4:39 AM, Ananth Gundabattula  wrote:

> Hello All,
>
> I was able to get around the issue pasted above by manually copying the
> javaee-api-7.0 jar inside the lib folder of Apex engine. It may be noted
> that the code works perfectly fine in a unit test that launches all of the
> operators in the DAG. ( Works end to end ) . However when I launch it on
> CDH 5.7 I run into the exception pasted above. It may be noted that the
> exception is being raised when I launch the app and the Apex engine is
> trying to validate the newly deployed app.
>
> I am suspecting that this could be an issue with incompatibility with CDH
> 5.7 ( The Hadoop stack on which I am trying to run Apex 3.3 on )
>
> Could anyone help me if they were able to run Apex on top of CDH 5.7. I
> got the same exception with both Apex 3.3 and 3.2.
>
> Regards,
> Ananth
>
> On Sat, Apr 30, 2016 at 11:34 AM, Ananth Gundabattula <
> agundabatt...@gmail.com> wrote:
>
>> Hello All,
>>
>> I am getting the following exception when I launch my Apex app.
>>
>> I tried including javaee-api- version 7.0 as well to get around this
>> error but does not seem to take effect.
>>
>> Any ideas why javax.persistence.Persistence is not getting loaded by the
>> APex engine classloader ?
>>
>> An error occurred trying to launch the application. Server message:
>>> java.lang.NoClassDefFoundError: javax/persistence/Persistence at
>>> org.apache.bval.jsr303.resolver.JPATraversableResolver.isReachable(JPATraversableResolver.java:34)
>>> at
>>> org.apache.bval.jsr303.resolver.DefaultTraversableResolver.isReachable(DefaultTraversableResolver.java:60)
>>> at
>>> org.apache.bval.jsr303.resolver.CachingTraversableResolver.isReachable(CachingTraversableResolver.java:82)
>>> at
>>> org.apache.bval.jsr303.ConstraintValidation.isReachable(ConstraintValidation.java:241)
>>> at
>>> org.apache.bval.jsr303.ConstraintValidation.validate(ConstraintValidation.java:166)
>>> at
>>> org.apache.bval.jsr303.ConstraintValidation.validate(ConstraintValidation.java:141)
>>> at
>>> org.apache.bval.util.ValidationHelper.validateProperty(ValidationHelper.java:233)
>>> at
>>> org.apache.bval.util.ValidationHelper.validateBean(ValidationHelper.java:216)
>>> at
>>> org.apache.bval.jsr303.ClassValidator.validateBeanNet(ClassValidator.java:393)
>>> at org.apache.bval.jsr303.ClassValidator.validate(ClassValidator.java:149)
>>> at
>>> com.datatorrent.stram.plan.logical.LogicalPlan.validate(LogicalPlan.java:1672)
>>> at com.datatorrent.stram.StramClient.(StramClient.java:161) at
>>> com.datatorrent.stram.client.StramAppLauncher.launchApp(StramAppLauncher.java:509)
>>> at com.datatorrent.stram.cli.DTCli$LaunchCommand.execute(DTCli.java:2050)
>>> at com.datatorrent.stram.cli.DTCli.launchAppPackage(DTCli.java:3456) at
>>> com.datatorrent.stram.cli.DTCli.access$7100(DTCli.java:106) at
>>> com.datatorrent.stram.cli.DTCli$LaunchCommand.execute(DTCli.java:1895) at
>>> com.datatorrent.stram.cli.DTCli$3.run(DTCli.java:1449) Fatal error
>>> encountered
>>
>>
>> Regards,
>> Ananth
>>
>
>


Re: Apex engine and malhar maven versions for Kafka 0.9 version operator

2016-04-16 Thread hsy...@gmail.com
Hi Ananth,

Could you please try to add malhar-library in test scope? Thanks!

Regards,
Siyuan

On Saturday, April 16, 2016, Ananth Gundabattula 
wrote:

> Hello all,
>
> I would like to run a unit test code using the new Kafka Operator that
> supports the 0.9 version protocol.
>
> In this process, I included the malhar-kafka library version (
> 3.3.1-incubating ) and am using the apex-engine ( version 3.3.0 ) with
>  as test/provided.
>
> The compilation works fine but my unit tests fail to run properly with "
> java.lang.ClassNotFoundException: com.datatorrent.lib.util.KryoCloneUtils"
> exception.
>
>
> What is the recommended way to run a unit test which uses Kafka 0.9
> operator integrated with the Apex engine ? I am assuming that the
> malhar-contrib library kafka operator is not 0.9 compliant ..
>
> The unit test code is like this :
>
> The class CassandraEventDetailsStreamingApp extends the
> AbstractKafkaInputOperator in the below snippet of code.
>
> The exception arises in the method lma.getController();
>
> @Test
> public void testApplication() throws IOException, Exception {
> try {
> LocalMode lma = LocalMode.newInstance();
> Configuration conf = new Configuration(false);
> 
> conf.addResource(this.getClass().getResourceAsStream("/dag-test-props.xml"));
> lma.prepareDAG(new CassandraEventDetailsStreamingApp(), conf);
> LocalMode.Controller lc = lma.getController();
> lc.run();
> } catch (ConstraintViolationException e) {
> Assert.fail("constraint violations: " + e.getConstraintViolations());
> }
> }
>
>
>
> Regards,
> Ananth
>


Exceptions when programmatically start multiple kafka brokers

2015-12-21 Thread hsy...@gmail.com
I'm trying to start 2 brokers in my kafka ingestion unit test and I got
exception

javax.management.InstanceAlreadyExistsException:
kafka.server:type=app-info,id=0
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
at kafka.server.KafkaServer.startup(KafkaServer.scala:239)
at
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
at
org.apache.apex.malhar.kafka.KafkaOperatorTestBase.startKafkaServer(KafkaOperatorTestBase.java:133)
at
org.apache.apex.malhar.kafka.KafkaOperatorTestBase.startKafkaServer(KafkaOperatorTestBase.java:143)
at
org.apache.apex.malhar.kafka.KafkaOperatorTestBase.beforeTest(KafkaOperatorTestBase.java:175)

It is caused by JMXMetrcsReporter?
It doesn't affect any function we want, but it is annoying.
How to disable it?

Thanks!


Re: how to programatically monitor Kafka availability

2015-12-17 Thread hsy...@gmail.com
Hey Hohl,

I use *partitionsFor
*
method to monitor the partition info for particular topics



On Tue, Dec 15, 2015 at 11:27 AM, Hohl, Ken  wrote:

> We want to be able to monitor the ability to send messages to Kafka
> topics.  We want to be aware of the inability to do so before the time we
> attempt to send a message.  What we're looking for is something like a
> heartbeat.  The reason we need this is that in our deployment environment,
> Kafka and its clients will not be co-located.  As such, network issues
> could cause Kafka to not be available to its client.
>
> We've considered using Zookeeper that's already managing the Kafka cluster
> but have not been able to determine exactly how we would use it.
>
> We've also considered requesting a JMX MBean periodically and concluding
> the cluster is not accessible if we can't get the MBean from at least 1
> broker.
>
> What is the recommended way of accomplishing what we're trying to do?
>
> Thanks.
>
> Ken Hohl
> Cars.com
>
>


Where can I find the document for consumer metrics

2015-12-17 Thread hsy...@gmail.com
I can find some broker/producer metrics here
http://kafka.apache.org/documentation.html#monitoring

but where can I find consumer metrics docs

Everytime I have to log this to find out what metrics I want

MetricName [name=join-rate, group=consumer-coordinator-metrics,
description=The number of group joins per second,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@ebea716;MetricName
[name=fetch-size-avg, group=consumer-fetch-manager-metrics, description=The
average number of bytes fetched per request,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@6cb9cea;MetricName
[name=commit-latency-avg, group=consumer-coordinator-metrics,
description=The average time taken for a commit request,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@21aaca22;MetricName
[name=join-time-avg, group=consumer-coordinator-metrics, description=The
average time taken for a group rejoin,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@53bc8f72;MetricName
[name=incoming-byte-rate, group=consumer-metrics, description=Bytes/second
read off all sockets,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@34f9c3e0;MetricName
[name=bytes-consumed-rate, group=consumer-fetch-manager-metrics,
description=The average number of bytes consumed per second,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@36c7401a;MetricName
[name=response-rate, group=consumer-metrics, description=Responses received
sent per second.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@5341870e;MetricName
[name=connection-creation-rate, group=consumer-metrics, description=New
connections established per second in the window.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@9f0d8f4;MetricName
[name=fetch-rate, group=consumer-fetch-manager-metrics, description=The
number of fetch requests per second.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@23338045;MetricName
[name=join-time-max, group=consumer-coordinator-metrics, description=The
max time taken for a group rejoin,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@5cdabd4d;MetricName
[name=io-wait-ratio, group=consumer-metrics, description=The fraction of
time the I/O thread spent waiting.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@659b7186;MetricName
[name=fetch-size-max, group=consumer-fetch-manager-metrics, description=The
maximum number of bytes fetched per request,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@403a4887;MetricName
[name=assigned-partitions, group=consumer-coordinator-metrics,
description=The number of partitions currently assigned to this consumer,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@326fb802;MetricName
[name=io-time-ns-avg, group=consumer-metrics, description=The average
length of time for I/O per select call in nanoseconds.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@432b0ee3;MetricName
[name=records-consumed-rate, group=consumer-fetch-manager-metrics,
description=The average number of records consumed per second,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@3fde7b88;MetricName
[name=io-wait-time-ns-avg, group=consumer-metrics, description=The average
length of time the I/O thread spent waiting for a socket ready for reads or
writes in nanoseconds.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@5485cfd8;MetricName
[name=select-rate, group=consumer-metrics, description=Number of times the
I/O layer checked for new I/O to perform per second,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@2cbdcaf6;MetricName
[name=fetch-throttle-time-max, group=consumer-fetch-manager-metrics,
description=The maximum throttle time in ms,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@6057f36c;MetricName
[name=heartbeat-response-time-max, group=consumer-coordinator-metrics,
description=The max time taken to receive a response to a heartbeat
request,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@2e2e68de;MetricName
[name=network-io-rate, group=consumer-metrics, description=The average
number of network operations (reads or writes) on all connections per
second.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@68e6de81;MetricName
[name=fetch-latency-max, group=consumer-fetch-manager-metrics,
description=The max time taken for any fetch request.,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@d1a1cf5;MetricName
[name=request-size-avg, group=consumer-metrics, description=The average
size of all requests in the window..,
tags={client-id=consumer-4}]=org.apache.kafka.common.metrics.KafkaMetric@2d631f8b;MetricName
[name=commit-rate, group=consumer-coordinator-metrics, 

Re: Kafka 0.9 consumer API question

2015-12-17 Thread hsy...@gmail.com
Hi Rajiv,

I think it makes sense to return a read-only assignments. What we can
improve here is we can have addPartition method for
consumer.
Then we don't have to do any operations on the assignments returned by
assignment method

BTW,
I think you can implement PartitionAssignor interface to solve your use
case.
I couldn't find the javadoc for that interface but here is method you can
use

/**
 * Perform the group assignment given the member subscriptions and
current cluster metadata.
 * @param metadata Current topic/broker metadata known by consumer
 * @param subscriptions Subscriptions from all members provided through
{@link #subscription(Set)}
 * @return A map from the members to their respective assignment. This
should have one entry
 * for all members who in the input subscription map.
 */
Map assign(Cluster metadata, Map subscriptions);

The subscription map has each consumer's member id as key. It can be used
as a reference to the consumer and you can adjust the assignments there.




On Tue, Dec 15, 2015 at 2:53 PM, Rajiv Kurian  wrote:

> Hi Jason,
>
> The copying is not a problem in terms of performance. It's just annoying to
> write the extra code. My point with the copy is that since the client is
> already making a copy when it returns the set to me, why would it matter if
> I modify the copy. Creating an unmodifiable set on top of a copy seems
> redundant. It would be easiest for us as users to do something like this:
>
> final Set partitions = consumer.assignment();  // This
> already returns a copy of the underlying assignment, thus ensuring that the
> internal data structures are protected.
> partitions.add(myNewTopicPartition);  // This is fine to modify since
> consumer.assignment() returns a copy.
> partitions.remove(topicPartitionToBeRemoved);
> consumer.assign(partitions);
>
> Instead we have to do something like this right now.
>
> final Set partitions = consumer.assignment();  // This
> returns a copy of the underlying assignment wrapped in an UnmodifiableSet
> which seems redundant.
> final Set yetAnotherCopy = new HashSet<>(partitions);  //
> We need this copy since consumer.assignment() is unmodifiable, even though
> it is a copy.
> yetAnotherCopy.add(myNewTopicPartition);
> yetAnotherCopy.remove(topicPartitionToBeRemoved);
> List wayTooManyCopies = new ArrayList<>(yetAnotherCopy);
> consumer.assign(wayTooManyCopies);
>
> Thanks,
> Rajiv
>
>
> On Tue, Dec 15, 2015 at 2:35 PM, Jason Gustafson 
> wrote:
>
> > Hey Rajiv,
> >
> > I agree the Set/List inconsistency is a little unfortunate (another
> > annoying one is pause() which uses a vararg). I think we should probably
> > add the following variants:
> >
> > assign(Collection)
> > subscribe(Collection)
> > pause(Collection)
> >
> > I can open a JIRA to fix this. As for returning the unmodifiable set, I
> can
> > see your point, but I think it's a little dangerous for user code to
> depend
> > on being able to modify a collection returned from the API. Making it
> > immutable reduces the coupling with user code and gives us more freedom
> in
> > the future (not that we have any intention of changing the set type, but
> we
> > could). I think the way I might try to implement your use case would be
> to
> > maintain the assignment set yourself. You can make changes to that set
> and
> > always pass it to assign(), which would avoid the need to use
> assignment().
> > Also, I probably wouldn't be overly concerned about the copying overhead
> > unless profiling shows that it is actually a problem. Are your partition
> > assignments generally very large?
> >
> > -Jason
> >
> >
> > On Tue, Dec 15, 2015 at 1:32 PM, Rajiv Kurian 
> wrote:
> >
> > > We are trying to use the Kafka 0.9 consumer API to poll specific
> > > partitions. We consume partitions based on our own logic instead of
> > > delegating that to Kafka. One of our use cases is handling a change in
> > the
> > > partitions that we consume. This means that sometimes we need to
> consume
> > > additional partitions and other times we need to stop consuming (not
> > pause
> > > but stop entirely) some of the partitions that we are currently
> polling.
> > >
> > > The semantics of the assign() call at
> > >
> > >
> >
> http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > is that we need to provide the entire list of subscriptions. So when we
> > > want to add or remove partitions we call the assignment() method to get
> > the
> > > existing set of TopicPartitions being polled, and then modify this set
> > and
> > > pass it back to the assign() call. However it seems weird that the
> > assign()
> > > call takes a List whereas the assignment call returns
> a
> > > Set. Further the Set returned by the method is an
> > > unmodifiable set which means to change this set we need to create a 

partitionsFor method doesn't return latest partition metadata

2015-11-30 Thread hsy...@gmail.com
Hi guys,

I want to use partitionsFor method of new consumer API periodically to get
the monitor partition metadata change, It seems it only issue remote call
to the server for the first time. If I add partitions after that,
partitionsFor will return stale value. Is there a way to reuse consumer
object to refresh the metadata change. Thanks!

regards,
Siyuan


partitionsFor method doesn't return latest partition metadata

2015-11-30 Thread hsy...@gmail.com
Hi guys,

I want to use partitionsFor method of new consumer API periodically to get
the monitor partition metadata change, It seems it only issue remote call
to the server for the first time. If I add partitions after that,
partitionsFor will return stale value. Is there a way to reuse consumer
object to refresh the metadata change. Thanks!

regards,
Siyuan


Is 0.9 new consumer API compatible with 0.8.x.x broker

2015-11-30 Thread hsy...@gmail.com
Is 0.9 new consumer API compatible with 0.8.x.x broker


Re: 0.9.0.0 RC4

2015-11-23 Thread hsy...@gmail.com
In http://kafka.apache.org/090/documentation.html#newconsumerconfigs
partition.assignment.strategy should string, not a list of string?

On Fri, Nov 20, 2015 at 5:21 PM, Jun Rao  wrote:

> This is the fourth candidate for release of Apache Kafka 0.9.0.0. This a
> major release that includes (1) authentication (through SSL and SASL) and
> authorization, (2) a new java consumer, (3) a Kafka connect framework for
> data ingestion and egression, and (4) quotas. Since this is a major
> release, we will give people a bit more time for trying this out.
>
> Release Notes for the 0.9.0.0 release
>
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/RELEASE_NOTES.html
>
> *** Please download, test and vote by Monday, Nov. 23, 6pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1
> and sha2 (SHA256) checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/
>
> * scala-doc
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/scaladoc/
>
> * java-doc
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/javadoc/
>
> * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.0 tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=132943b0f83831132cd46ac961cf6f1c00132565
>
> * Documentation
> http://kafka.apache.org/090/documentation.html
>
> /***
>
> Thanks,
>
> Jun
>


Re: 0.9.0.0 RC4

2015-11-23 Thread hsy...@gmail.com
In http://kafka.apache.org/090/documentation.html#newconsumerconfigs
partition.assignment.strategy should string, not a list of string?

On Fri, Nov 20, 2015 at 5:21 PM, Jun Rao  wrote:

> This is the fourth candidate for release of Apache Kafka 0.9.0.0. This a
> major release that includes (1) authentication (through SSL and SASL) and
> authorization, (2) a new java consumer, (3) a Kafka connect framework for
> data ingestion and egression, and (4) quotas. Since this is a major
> release, we will give people a bit more time for trying this out.
>
> Release Notes for the 0.9.0.0 release
>
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/RELEASE_NOTES.html
>
> *** Please download, test and vote by Monday, Nov. 23, 6pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1
> and sha2 (SHA256) checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/
>
> * scala-doc
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/scaladoc/
>
> * java-doc
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate4/javadoc/
>
> * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.0 tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=132943b0f83831132cd46ac961cf6f1c00132565
>
> * Documentation
> http://kafka.apache.org/090/documentation.html
>
> /***
>
> Thanks,
>
> Jun
>


Re: Commit offsets only work for subscribe(), not assign()

2015-11-23 Thread hsy...@gmail.com
Hey Jason,

The test I did is very simple, I was using manual assignment with it's own
groupid and clientid. I first started a process to consume data, then
produce some data, then kill the process, continue produce more data and
start the process again, I didn't see anything from the time the process
was killed.  Do I have set "auto.offset.reset" as "none"?  Thank

On Fri, Nov 20, 2015 at 3:56 PM, Jason Gustafson <ja...@confluent.io> wrote:

> I suppose I should have added one qualification to that. The commit API
> will not work for a consumer using manual assignment if its groupId is
> shared with another consumer using automatic assignment (with subscribe()).
> When a consumer group is active, Kafka only allows commits from members of
> that group.
>
> -Jason
>
> On Fri, Nov 20, 2015 at 3:41 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey Siyuan,
> >
> > The commit API should work the same regardless whether subscribe() or
> > assign() was used. Does this not appear to be working?
> >
> > Thanks,
> > Jason
> >
> > On Wed, Nov 18, 2015 at 4:40 PM, hsy...@gmail.com <hsy...@gmail.com>
> > wrote:
> >
> >> In the new API, the explicit commit offset method call only works for
> >> subscribe consumer, not the assign consumer, correct?
> >>
> >> Best,
> >> Siyuan
> >>
> >
> >
>


Re: Questions about new consumer API

2015-11-18 Thread hsy...@gmail.com
That sounds like a good suggestion. I'm actually looking at the code and I
will start another thread for questions about that.

On Tue, Nov 17, 2015 at 5:42 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Thanks for the explanation. Certainly you'd use less connections with this
> approach, but it might be worthwhile to do some performance analysis to see
> whether there is much difference in throughput (I'd be interested in seeing
> these results myself). Another approach that might be interesting would be
> to implement your own partition assignor which took into account the
> leaders of each partition. Then you could just use subscribe() and let
> Kafka manage the group for you. This is similar to how we were thinking of
> implementing consumer rack-awareness.
>
> -Jason
>
> On Tue, Nov 17, 2015 at 4:04 PM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
> > By efficiency, I mean maximize throughput while minimize resources on
> both
> > broker sides and consumer sides.
> >
> > One example is if you have over 200 partitions on 10 brokers and you can
> > start 5 consumer processes to consume data, if each one is single-thread
> > and you do round-robin to distribute the load then each one will try to
> > fetch from over 40 partitions one by one through 10 connections
> > possibly(overall is 50),  but if it's smart enough to group partitions by
> > brokers, each process can have 2 separate threads(consuming from 2
> > different brokers concurrently). That seems a more optimal solution than
> > another, right?
> >
> > On Tue, Nov 17, 2015 at 2:54 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Siyuan,
> > >
> > > Your understanding about assign/subscribe is correct. We think of topic
> > > subscription as enabling automatic assignment as opposed to doing
> manual
> > > assignment through assign(). We don't currently them to be mixed.
> > >
> > > Can you elaborate on your findings with respect to using one thread per
> > > broker? In what sense was it more efficient? Doing the same thing might
> > be
> > > tricky with the new consumer, but I think you could do it using
> > > partitionsFor() to find the current partition leaders and assign() to
> set
> > > the assignment in each thread.
> > >
> > > -Jason
> > >
> > > On Tue, Nov 17, 2015 at 10:25 AM, hsy...@gmail.com <hsy...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Guozhang,
> > > >
> > > > Maybe I should give a few words about what I'm going to achieve with
> > new
> > > > API
> > > >
> > > > Currently, I'm building a new kafka connector for Apache Apex(
> > > > http://apex.incubator.apache.org/) using 0.9.0 API
> > > > Apex support dynamic partition, so in the old version, We manage all
> > the
> > > > consumer partitions in either 1:1 strategy (each consumer process
> > > consumes
> > > > only from one kafka partition) or 1:n strategy (each consumer process
> > > could
> > > > consume from multiple kafka partitions, using round-robin to
> > distribute)
> > > > And we also have separate thread to monitor topic metadata
> > change(leader
> > > > broker change, new partition added, using internal API like ZkUtil
> etc)
> > > > and do dynamic partition based on that(for example auto-reconnect to
> > new
> > > > leader broker, create new partition to consume from new kafka
> partition
> > > at
> > > > runtime).  You can see High-level consumer doesn't work(It can only
> > > balance
> > > > between existing consumers unless you manually add new one)  I'm
> > thinking
> > > > if the new consumer could be used to save some work we did before.
> > > >
> > > > I'm still confused with assign() and subscribe().  My understanding
> is
> > if
> > > > you use assign() only, the consumer becomes more like a simple
> consumer
> > > > except if the leader broker changes it automatically reconnect to the
> > new
> > > > leader broker, is it correct?   If you use subscribe() method only
> then
> > > all
> > > > the partitions will be distributed to running consumer process with
> > same
> > > "
> > > > group.id" using "partition.assignment.strategy". Is it true?
> > > >
> > > > So I assume assign() and subscribe()(and group.id
> > > > partition.assignment.

Q about PartitionAssignor

2015-11-18 Thread hsy...@gmail.com
Hey guys,

I saw the PartitionAssignor is not in public doc API and the package name
is internals.

Does it mean this API is not stable and could be changed even in minor
release?

And in the assign method signature, the key for the "subscription" map is
memberId, what is memberId, can I manually set the id to identify member?
I want to do some sticky assignment.


Thanks!

Best,
Siyuan


Commit offsets only work for subscribe(), not assign()

2015-11-18 Thread hsy...@gmail.com
In the new API, the explicit commit offset method call only works for
subscribe consumer, not the assign consumer, correct?

Best,
Siyuan


Re: Q about PartitionAssignor

2015-11-18 Thread hsy...@gmail.com
Thanks Guozhang,  what is userData for in the Subscription?

On Wed, Nov 18, 2015 at 12:05 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Currently the whole KafkaConsumer interface is tagged as "
> @InterfaceStability.Unstable", meaning that the API may change in the
> future. We have been very careful to make any dramatic public API changes
> but still cannot guarantee this will not happen.
>
> Member-Id is assigned by the server-side coordinator upon accepting the
> consumer to join the specified group, hence it cannot be manually set. But
> the memberId will not change as long as the consumer is still part of the
> members of the group, so you want to do some sticky assignment you can just
> remember the memberId -> partitions map on the consumer side in some
> persistent storage so that even when the leader who does the assignment has
> failed over other new leaders can still access the past assignment history.
>
> Guozhang
>
>
>
> On Wed, Nov 18, 2015 at 9:02 AM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
> > Hey guys,
> >
> > I saw the PartitionAssignor is not in public doc API and the package name
> > is internals.
> >
> > Does it mean this API is not stable and could be changed even in minor
> > release?
> >
> > And in the assign method signature, the key for the "subscription" map is
> > memberId, what is memberId, can I manually set the id to identify member?
> > I want to do some sticky assignment.
> >
> >
> > Thanks!
> >
> > Best,
> > Siyuan
> >
>
>
>
> --
> -- Guozhang
>


Re: build error

2015-11-17 Thread hsy...@gmail.com
I got main class not found error. So I installed gradle 2.5 and run gradle
build (not the wrapper)

On Mon, Nov 16, 2015 at 10:17 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Did you just use "./gradlew build" in root directory?
>
> Guozhang
>
> On Mon, Nov 16, 2015 at 6:41 PM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
> > The actual thing I want to do is I want to build and install in my local
> > maven repository so I can include new api in my dependencies. When the
> > release is officially out, I can have both my code ready with the
> official
> > maven dependency
> >
> > Thanks,
> > Siyuan
> >
> > On Monday, November 16, 2015, Grant Henke <ghe...@cloudera.com> wrote:
> >
> > > Hi Siyuan,
> > >
> > > My guess is that you are trying to build from a subdirectory. I have a
> > > minor patch available to fix this that has not been pulled in yet here:
> > > https://github.com/apache/kafka/pull/509
> > >
> > > In the mean time, if you need to build a subproject you can execute a
> > > command like the following:
> > > gradle clients:build
> > >
> > > Thanks,
> > > Grant
> > >
> > > On Mon, Nov 16, 2015 at 6:33 PM, Guozhang Wang <wangg...@gmail.com
> > > <javascript:;>> wrote:
> > >
> > > > Siyuan,
> > > >
> > > > Which command did you use to build?
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Nov 16, 2015 at 4:01 PM, hsy...@gmail.com <javascript:;> <
> > > hsy...@gmail.com <javascript:;>>
> > > > wrote:
> > > >
> > > > > I got a build error on both trunk and 0.9.0 branch
> > > > >
> > > > > > docs/producer_config.html (No such file or directory)
> > > > >
> > > > > Do I miss anything before build
> > > > >
> > > > > Thanks,
> > > > > Siyuan
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > Grant Henke
> > > Software Engineer | Cloudera
> > > gr...@cloudera.com <javascript:;> | twitter.com/gchenke |
> > > linkedin.com/in/granthenke
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: build error

2015-11-17 Thread hsy...@gmail.com
And I couldn't find wrapper jar files under the gradle folder
https://github.com/apache/kafka/tree/0.9.0/gradle



On Mon, Nov 16, 2015 at 10:17 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Did you just use "./gradlew build" in root directory?
>
> Guozhang
>
> On Mon, Nov 16, 2015 at 6:41 PM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
> > The actual thing I want to do is I want to build and install in my local
> > maven repository so I can include new api in my dependencies. When the
> > release is officially out, I can have both my code ready with the
> official
> > maven dependency
> >
> > Thanks,
> > Siyuan
> >
> > On Monday, November 16, 2015, Grant Henke <ghe...@cloudera.com> wrote:
> >
> > > Hi Siyuan,
> > >
> > > My guess is that you are trying to build from a subdirectory. I have a
> > > minor patch available to fix this that has not been pulled in yet here:
> > > https://github.com/apache/kafka/pull/509
> > >
> > > In the mean time, if you need to build a subproject you can execute a
> > > command like the following:
> > > gradle clients:build
> > >
> > > Thanks,
> > > Grant
> > >
> > > On Mon, Nov 16, 2015 at 6:33 PM, Guozhang Wang <wangg...@gmail.com
> > > <javascript:;>> wrote:
> > >
> > > > Siyuan,
> > > >
> > > > Which command did you use to build?
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Nov 16, 2015 at 4:01 PM, hsy...@gmail.com <javascript:;> <
> > > hsy...@gmail.com <javascript:;>>
> > > > wrote:
> > > >
> > > > > I got a build error on both trunk and 0.9.0 branch
> > > > >
> > > > > > docs/producer_config.html (No such file or directory)
> > > > >
> > > > > Do I miss anything before build
> > > > >
> > > > > Thanks,
> > > > > Siyuan
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > Grant Henke
> > > Software Engineer | Cloudera
> > > gr...@cloudera.com <javascript:;> | twitter.com/gchenke |
> > > linkedin.com/in/granthenke
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: Questions about new consumer API

2015-11-17 Thread hsy...@gmail.com
Thanks Guozhang,

Maybe I should give a few words about what I'm going to achieve with new API

Currently, I'm building a new kafka connector for Apache Apex(
http://apex.incubator.apache.org/) using 0.9.0 API
Apex support dynamic partition, so in the old version, We manage all the
consumer partitions in either 1:1 strategy (each consumer process consumes
only from one kafka partition) or 1:n strategy (each consumer process could
consume from multiple kafka partitions, using round-robin to distribute)
And we also have separate thread to monitor topic metadata change(leader
broker change, new partition added, using internal API like ZkUtil etc)
and do dynamic partition based on that(for example auto-reconnect to new
leader broker, create new partition to consume from new kafka partition at
runtime).  You can see High-level consumer doesn't work(It can only balance
between existing consumers unless you manually add new one)  I'm thinking
if the new consumer could be used to save some work we did before.

I'm still confused with assign() and subscribe().  My understanding is if
you use assign() only, the consumer becomes more like a simple consumer
except if the leader broker changes it automatically reconnect to the new
leader broker, is it correct?   If you use subscribe() method only then all
the partitions will be distributed to running consumer process with same "
group.id" using "partition.assignment.strategy". Is it true?

So I assume assign() and subscribe()(and group.id
partition.assignment.strategy settings) can not be used together?

Also in the old API we found one thread per broker is the most efficient
way to consume data, for example, if one process consumes from p1, p2, p3
and p1,p2 are sitting on one broker b1, p3 is sitting on another one b2,
the best thing is create 2 threads each thread use simple consumer API and
only consume from one broker.  I'm thinking how do I use the new API to do
this.

Thanks,
Siyuan

On Mon, Nov 16, 2015 at 4:43 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Siyuan,
>
> 1) new consumer is single-threaded, it does not maintain any internal
> threads as the old high-level consumer.
>
> 2) each consumer will only maintain one TCP connection with each broker.
> The only extra socket is the one with its coordinator. That is, if there is
> three brokers S1, S2, S3, and S1 is the coordinator for this consumer, it
> will maintain 4 sockets in total, 2 for S1 (one for fetching, one for
> coordinating) and 1 for S2 and S3 (only for fetching).
>
> 3) Currently the connection is not closed by consumer, although the
> underlying network client / selector will close idle ones after some
> timeout. So in worst case it will only maintain N+1 sockets in total for N
> Kafka brokers at one time.
>
> Guozhang
>
> On Mon, Nov 16, 2015 at 4:22 PM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
> > The new consumer API looks good. If I understand it correctly you can use
> > it like simple consumer or high-level consumer. But I have couple
> questions
> > about it's internal implementation
> >
> > First of all does the consumer have any internal fetcher threads like
> > high-level consumer?
> >
> > When you assign multiple TopicPartitions to a consumer, how many TCP
> > connections it establish to the brokers. Is it same as number of leader
> > brokers that host those partitions or just number of TopicPartitions. If
> > there is any leader broker change does it establish new connections/using
> > existing connections to fetch the data? Can it continue consuming? Also
> is
> > the connection kept until the consumer is closed?
> >
> > Thanks!
> >
> > Best,
> > Siyuan
> >
>
>
>
> --
> -- Guozhang
>


Re: Questions about new consumer API

2015-11-17 Thread hsy...@gmail.com
By efficiency, I mean maximize throughput while minimize resources on both
broker sides and consumer sides.

One example is if you have over 200 partitions on 10 brokers and you can
start 5 consumer processes to consume data, if each one is single-thread
and you do round-robin to distribute the load then each one will try to
fetch from over 40 partitions one by one through 10 connections
possibly(overall is 50),  but if it's smart enough to group partitions by
brokers, each process can have 2 separate threads(consuming from 2
different brokers concurrently). That seems a more optimal solution than
another, right?

On Tue, Nov 17, 2015 at 2:54 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hi Siyuan,
>
> Your understanding about assign/subscribe is correct. We think of topic
> subscription as enabling automatic assignment as opposed to doing manual
> assignment through assign(). We don't currently them to be mixed.
>
> Can you elaborate on your findings with respect to using one thread per
> broker? In what sense was it more efficient? Doing the same thing might be
> tricky with the new consumer, but I think you could do it using
> partitionsFor() to find the current partition leaders and assign() to set
> the assignment in each thread.
>
> -Jason
>
> On Tue, Nov 17, 2015 at 10:25 AM, hsy...@gmail.com <hsy...@gmail.com>
> wrote:
>
> > Thanks Guozhang,
> >
> > Maybe I should give a few words about what I'm going to achieve with new
> > API
> >
> > Currently, I'm building a new kafka connector for Apache Apex(
> > http://apex.incubator.apache.org/) using 0.9.0 API
> > Apex support dynamic partition, so in the old version, We manage all the
> > consumer partitions in either 1:1 strategy (each consumer process
> consumes
> > only from one kafka partition) or 1:n strategy (each consumer process
> could
> > consume from multiple kafka partitions, using round-robin to distribute)
> > And we also have separate thread to monitor topic metadata change(leader
> > broker change, new partition added, using internal API like ZkUtil etc)
> > and do dynamic partition based on that(for example auto-reconnect to new
> > leader broker, create new partition to consume from new kafka partition
> at
> > runtime).  You can see High-level consumer doesn't work(It can only
> balance
> > between existing consumers unless you manually add new one)  I'm thinking
> > if the new consumer could be used to save some work we did before.
> >
> > I'm still confused with assign() and subscribe().  My understanding is if
> > you use assign() only, the consumer becomes more like a simple consumer
> > except if the leader broker changes it automatically reconnect to the new
> > leader broker, is it correct?   If you use subscribe() method only then
> all
> > the partitions will be distributed to running consumer process with same
> "
> > group.id" using "partition.assignment.strategy". Is it true?
> >
> > So I assume assign() and subscribe()(and group.id
> > partition.assignment.strategy settings) can not be used together?
> >
> > Also in the old API we found one thread per broker is the most efficient
> > way to consume data, for example, if one process consumes from p1, p2, p3
> > and p1,p2 are sitting on one broker b1, p3 is sitting on another one b2,
> > the best thing is create 2 threads each thread use simple consumer API
> and
> > only consume from one broker.  I'm thinking how do I use the new API to
> do
> > this.
> >
> > Thanks,
> > Siyuan
> >
> > On Mon, Nov 16, 2015 at 4:43 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi Siyuan,
> > >
> > > 1) new consumer is single-threaded, it does not maintain any internal
> > > threads as the old high-level consumer.
> > >
> > > 2) each consumer will only maintain one TCP connection with each
> broker.
> > > The only extra socket is the one with its coordinator. That is, if
> there
> > is
> > > three brokers S1, S2, S3, and S1 is the coordinator for this consumer,
> it
> > > will maintain 4 sockets in total, 2 for S1 (one for fetching, one for
> > > coordinating) and 1 for S2 and S3 (only for fetching).
> > >
> > > 3) Currently the connection is not closed by consumer, although the
> > > underlying network client / selector will close idle ones after some
> > > timeout. So in worst case it will only maintain N+1 sockets in total
> for
> > N
> > > Kafka brokers at one time.
> > >
> > > Guozhang
> > >
> > > On Mon, Nov 16, 201

build error

2015-11-16 Thread hsy...@gmail.com
I got a build error on both trunk and 0.9.0 branch

> docs/producer_config.html (No such file or directory)

Do I miss anything before build

Thanks,
Siyuan


Questions about new consumer API

2015-11-16 Thread hsy...@gmail.com
The new consumer API looks good. If I understand it correctly you can use
it like simple consumer or high-level consumer. But I have couple questions
about it's internal implementation

First of all does the consumer have any internal fetcher threads like
high-level consumer?

When you assign multiple TopicPartitions to a consumer, how many TCP
connections it establish to the brokers. Is it same as number of leader
brokers that host those partitions or just number of TopicPartitions. If
there is any leader broker change does it establish new connections/using
existing connections to fetch the data? Can it continue consuming? Also is
the connection kept until the consumer is closed?

Thanks!

Best,
Siyuan


Re: build error

2015-11-16 Thread hsy...@gmail.com
The actual thing I want to do is I want to build and install in my local
maven repository so I can include new api in my dependencies. When the
release is officially out, I can have both my code ready with the official
maven dependency

Thanks,
Siyuan

On Monday, November 16, 2015, Grant Henke <ghe...@cloudera.com> wrote:

> Hi Siyuan,
>
> My guess is that you are trying to build from a subdirectory. I have a
> minor patch available to fix this that has not been pulled in yet here:
> https://github.com/apache/kafka/pull/509
>
> In the mean time, if you need to build a subproject you can execute a
> command like the following:
> gradle clients:build
>
> Thanks,
> Grant
>
> On Mon, Nov 16, 2015 at 6:33 PM, Guozhang Wang <wangg...@gmail.com
> <javascript:;>> wrote:
>
> > Siyuan,
> >
> > Which command did you use to build?
> >
> > Guozhang
> >
> > On Mon, Nov 16, 2015 at 4:01 PM, hsy...@gmail.com <javascript:;> <
> hsy...@gmail.com <javascript:;>>
> > wrote:
> >
> > > I got a build error on both trunk and 0.9.0 branch
> > >
> > > > docs/producer_config.html (No such file or directory)
> > >
> > > Do I miss anything before build
> > >
> > > Thanks,
> > > Siyuan
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com <javascript:;> | twitter.com/gchenke |
> linkedin.com/in/granthenke
>


Re: Packaging new apps

2015-05-11 Thread hsy...@gmail.com
Hi Jean,

Thanks for the change, using instance tag(is it a new feature in the latest
version? I didn't see it in the older slider versions) is a really good
idea.  it might be good for other's to have a template but not for kafka.
Kafka is evolving in quite fast pace. I've seen many property key/val
change in last several releases. Our method is keep most properties default
and only override the one declared in appConfig.json which is actually
supported in current python script(maybe need some change for the latest
slider).

And  Kafka broker is bundled with local disk once it's launched so in the
real world there would be at most one instance for each NM.

Best,
Siyuan



On Mon, May 11, 2015 at 10:16 AM, Jean-Baptiste Note jbn...@gmail.com
wrote:

 Hi Thomas,

 According to kafka's documentation:
 http://kafka.apache.org/07/configuration.html there should be a default
 value for any added property; I would expect the provided server.properties
 file to actually reflect those default values.
 Therefore, I'd look twice before overconstraining the problem, and would
 just generate the file for those and only those dictionary values that have
 been set in the appConfig (which currently, my code does not, it configures
 too many properties statically, but it can be arranged), relying on the
 default properties for the rest.

 If there's really a case to have all properties at hand, I could:
 * parse the properties file provided in the tarball
 * re-generate the whole conf file with the parsed + overrides

 This, in order to allow for *added* properties (which the current schemes,
 either mine or yours, does not look to allow) AND ultimately, allow for the
 whole tarball installation to be switched to read-only (which could allow
 them to be shared among instances running on the same NM; I don't know if
 slider currently does this kind of optimization).

 Maybe guidance from people more familiar with slider than us would be
 needed here :)

 Kind regards,
 JB



Re: Component instance level configuration

2015-01-29 Thread hsy...@gmail.com
Steve,

I think exposing perhaps instance id in agent api should be good enough.
I don't think people need full override policy for configuration of every
instance.

Moreover, do you preserve locations for each instance? For a service like
kafka, it writes data to local disk. You can not really switch broker id in
between instances after relaunch.

Thanks!

On Thu, Jan 29, 2015 at 2:44 AM, Steve Loughran ste...@hortonworks.com
wrote:

 I see. There's configuration per component type, but not per instance of
 that type.

 That's an interesting idea...I'll have to think about a way that could be
 made to work

 
 From: 杨浩 yangha...@gmail.com
 Sent: 28 January 2015 00:45
 To: dev@slider.incubator.apache.org
 Subject: Re: Component instance level configuration

 do you mean each instance of the. same component will have their own
 configuation? Only port can be different for instance of a component
 nowadays.

 2015-01-28 5:57 GMT+08:00 hsy...@gmail.com hsy...@gmail.com:

  Hi,
 
  I'm just wondering if there is a way to support component instance level
   configuration. For example, in kafka-on-yarn, kafka broker is a
 component,
  but  each broker instance needs a unique id. It would be better to expose
  the instance id, so that different instance would override some of the
  configurations accordingly
 
  Best,
  Siyuan
 



Component instance level configuration

2015-01-27 Thread hsy...@gmail.com
Hi,

I'm just wondering if there is a way to support component instance level
 configuration. For example, in kafka-on-yarn, kafka broker is a component,
but  each broker instance needs a unique id. It would be better to expose
the instance id, so that different instance would override some of the
configurations accordingly

Best,
Siyuan


Given brokers, is it able to know all the zookeepers that brokers connect to

2014-12-11 Thread hsy...@gmail.com
Hi Guys,

If I know the brokers. Is there a way to know the zookeeper host from
broker list?

Thanks!
Siyuan


Re: Is it able to specify the percentage of memory instead of # of MB

2014-12-08 Thread hsy...@gmail.com
Sumit,

Yes, that's the reason.

Best,
Siyuan

On Tue, Dec 2, 2014 at 5:33 PM, Sumit Mohanty smoha...@hortonworks.com
wrote:

 Siyuan,

 can you clarify the underlying requirement that requires an application to
 specify a percentage of available memory? One, as Steve suggested, could be
 to ask for 51% of available on node to guarantee that only one instance can
 be active on the node. Were there other reasons?

 -Sumit

 On Tue, Dec 2, 2014 at 5:29 PM, Ted Yu yuzhih...@gmail.com wrote:

  I created SLIDER-691 for this feature.
 
  Cheers
 
  On Tue, Dec 2, 2014 at 7:09 AM, Steve Loughran ste...@hortonworks.com
  wrote:
 
   I don't even think we can ask yarn for the percentage of memory
  
   I do understand what you are trying to do: ask for 51%.
  
   We could do something where we can query YARN for the maximum amount an
  app
   can ask for (the AM finds this out when registered), and the capacity
  could
   be derived as a percentage of that.
   Rather than percentage of available RAM, it's just a fraction of what
 you
   are allowed.
  
  
   On 2 December 2014 at 01:49, Sumit Mohanty smoha...@hortonworks.com
   wrote:
  
That is not possible. The number provided (# of MB) is used to ask
 for
container from YARN and YARN only accepts numbers.
   
% of total on each node also becomes an issue when nodes can have
  varying
value of total memory available. This is common for heterogeneous
   clusters
where hosts may have different value for available memory and core,
  etc.
   
On Mon, Dec 1, 2014 at 1:39 PM, hsy...@gmail.com hsy...@gmail.com
   wrote:
   
 Hi guys,

 For each component instance, Is there a way to specify the % of
 total
 memory of each node instead of # of MB?

 Best,
 Siyuan

   
--
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or
  entity
   to
which it is addressed and may contain information that is
 confidential,
privileged and exempt from disclosure under applicable law. If the
  reader
of this message is not the intended recipient, you are hereby
 notified
   that
any printing, copying, dissemination, distribution, disclosure or
forwarding of this communication is strictly prohibited. If you have
received this communication in error, please contact the sender
   immediately
and delete it from your system. Thank You.
   
  
   --
   CONFIDENTIALITY NOTICE
   NOTICE: This message is intended for the use of the individual or
 entity
  to
   which it is addressed and may contain information that is confidential,
   privileged and exempt from disclosure under applicable law. If the
 reader
   of this message is not the intended recipient, you are hereby notified
  that
   any printing, copying, dissemination, distribution, disclosure or
   forwarding of this communication is strictly prohibited. If you have
   received this communication in error, please contact the sender
  immediately
   and delete it from your system. Thank You.
  
 

 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.



Questions about new consumer API

2014-12-02 Thread hsy...@gmail.com
Hi guys,

I'm interested in the new Consumer API.
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/

I have couple of question.
1. In this doc it says kafka consumer will automatically do load balance.
Is it based on throughtput or same as what we have now balance the
cardinality among all consumers in same ConsumerGroup? In a real case
different partitions could have different peak time.
2. In the API, threre is subscribe(partition...) method saying not using
group management, does it mean the group.id property will be discarded and
developer has full control of distributing partitions to consumers?
3. Is new API compatible with old broker?
4. Will simple consumer api and high-level consumer api still be supported?

Thanks!

Best,
Siyuan


Re: Questions about new consumer API

2014-12-02 Thread hsy...@gmail.com
Thanks Neha, another question, so if offsets are stored under group.id,
dose it mean in one group, there should be at most one subscriber for each
topic partition?

Best,
Siyuan

On Tue, Dec 2, 2014 at 12:55 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 1. In this doc it says kafka consumer will automatically do load balance.
 Is it based on throughtput or same as what we have now balance the
 cardinality among all consumers in same ConsumerGroup? In a real case
 different partitions could have different peak time.

 Load balancing is still based on # of partitions for the subscribed topics
 and
 ensuring that each partition has exactly one consumer as the owner.

 2. In the API, threre is subscribe(partition...) method saying not using
 group management, does it mean the group.id property will be discarded and
 developer has full control of distributing partitions to consumers?

 group.id is also required for offset management, if the user chooses to
 use
 Kafka based offset management. The user will have full control over
 distribution
 of partitions to consumers.

 3. Is new API compatible with old broker?

 Yes, it will.

 4. Will simple consumer api and high-level consumer api still be supported?

 Over time, we will phase out the current high-level and simple consumer
 since the
 0.9 API supports both.

 Thanks,
 Neha

 On Tue, Dec 2, 2014 at 12:07 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  Hi guys,
 
  I'm interested in the new Consumer API.
  http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/
 
  I have couple of question.
  1. In this doc it says kafka consumer will automatically do load balance.
  Is it based on throughtput or same as what we have now balance the
  cardinality among all consumers in same ConsumerGroup? In a real case
  different partitions could have different peak time.
  2. In the API, threre is subscribe(partition...) method saying not using
  group management, does it mean the group.id property will be discarded
 and
  developer has full control of distributing partitions to consumers?
  3. Is new API compatible with old broker?
  4. Will simple consumer api and high-level consumer api still be
 supported?
 
  Thanks!
 
  Best,
  Siyuan
 



Is it able to specify the percentage of memory instead of # of MB

2014-12-01 Thread hsy...@gmail.com
Hi guys,

For each component instance, Is there a way to specify the % of total
memory of each node instead of # of MB?

Best,
Siyuan


Is there a plan to build a ubiquitous web service API to manage the kafka cluster

2014-11-24 Thread hsy...@gmail.com
Hi guys,

Nowadays, all kafka administration work (add, tear down node, topic
management, throughput monitor) are done by various different tool talk to
brokers, zookeeper etc. Is there a plan for core team to build a central
universal server providing webservice API to do all the admin work?

Best,
Siyuan


I can't specify user by calling Execute method

2014-11-20 Thread hsy...@gmail.com
Hi I specified user when call Execute(..., user='hdp',...) But the
container processes are all launched as *yarn* user


slider stop bug?

2014-11-18 Thread hsy...@gmail.com
If the application is stuck in ACCEPT status, slider stop command doesn't
stop the yarn application.

I didn't setup the scheduler address correctly so the application stuck at
ACCEPT status and I couldn't use slider stop to stop the application

Best,
Siyuan


Re: slider stop bug?

2014-11-18 Thread hsy...@gmail.com
I'm using 0.60

On Tue, Nov 18, 2014 at 12:26 PM, Sumit Mohanty smoha...@hortonworks.com
wrote:

 You can use --force.

 Are you using the RC for 0.60 or 0.40?

 On Tue, Nov 18, 2014 at 12:00 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  If the application is stuck in ACCEPT status, slider stop command doesn't
  stop the yarn application.
 
  I didn't setup the scheduler address correctly so the application stuck
 at
  ACCEPT status and I couldn't use slider stop to stop the application
 
  Best,
  Siyuan
 

 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.



Application configuration page is outdated?

2014-11-18 Thread hsy...@gmail.com
Hi guys,

I just switch from 40 to 60 and I found the application configuration
doesn't work any more.
http://slider.incubator.apache.org/docs/slider_specs/application_instance_configuration.html

For example :

{
  schema : http://example.org/specification/v2.0.0;,
  metadata : {
  },
  global : {
  config_types: core-site,hdfs-site,hbase-site,

  java_home: /usr/jdk64/jdk1.7.0_45,
  package_list: files/hbase-0.96.1-hadoop2-bin.tar,
  create.default.zookeeper.node: true

  site.global.app_user: yarn,
  site.global.app_log_dir: ${AGENT_LOG_ROOT}/app/log,
  site.global.app_pid_dir: ${AGENT_WORK_ROOT}/app/run,
  site.global.security_enabled: false,

  site.hbase-site.hbase.hstore.flush.retries.number: 120,
  site.hbase-site.hbase.client.keyvalue.maxsize: 10485760,
  site.hbase-site.hbase.hstore.compactionThreshold: 3,
  site.hbase-site.hbase.rootdir: ${NN_URI}/apps/hbase/data,
  site.hbase-site.hbase.tmp.dir: ${AGENT_WORK_ROOT}/work/app/tmp,
  site.hbase-site.hbase.master.info.port:
${HBASE_MASTER.ALLOCATED_PORT},
  site.hbase-site.hbase.regionserver.port: 0,
  site.hbase-site.zookeeper.znode.parent: ${DEF_ZK_PATH},

  site.core-site.fs.defaultFS: ${NN_URI},
  site.hdfs-site.dfs.namenode.https-address: ${NN_HOST}:50470,
  site.hdfs-site.dfs.namenode.http-address: ${NN_HOST}:50070
  }
}


I can't get config_types in agent python script anymore. Only global
properties are set and caught



Thanks!


Re: Application configuration page is outdated?

2014-11-18 Thread hsy...@gmail.com
I just figure out:

config_types is changed to system_configs

Or you can have configFile in metainfo.xml and create configuration
folder with xml or env configuration files in it


On Tue, Nov 18, 2014 at 2:04 PM, Sumit Mohanty smoha...@hortonworks.com
wrote:

 Sharing the metainfo.xml, appConfig.json, and resources.json should be
 enough.

 -Sumit

 On Tue, Nov 18, 2014 at 2:00 PM, Sumit Mohanty smoha...@hortonworks.com
 wrote:

  Is it possible for you to share the application package? I can browse
  through it to see what needs to change.
 
  On Tue, Nov 18, 2014 at 1:48 PM, hsy...@gmail.com hsy...@gmail.com
  wrote:
 
  Hi guys,
 
  I just switch from 40 to 60 and I found the application configuration
  doesn't work any more.
 
 
 http://slider.incubator.apache.org/docs/slider_specs/application_instance_configuration.html
 
  For example :
 
  {
schema : http://example.org/specification/v2.0.0;,
metadata : {
},
global : {
config_types: core-site,hdfs-site,hbase-site,
 
java_home: /usr/jdk64/jdk1.7.0_45,
package_list: files/hbase-0.96.1-hadoop2-bin.tar,
create.default.zookeeper.node: true
 
site.global.app_user: yarn,
site.global.app_log_dir: ${AGENT_LOG_ROOT}/app/log,
site.global.app_pid_dir: ${AGENT_WORK_ROOT}/app/run,
site.global.security_enabled: false,
 
site.hbase-site.hbase.hstore.flush.retries.number: 120,
site.hbase-site.hbase.client.keyvalue.maxsize: 10485760,
site.hbase-site.hbase.hstore.compactionThreshold: 3,
site.hbase-site.hbase.rootdir: ${NN_URI}/apps/hbase/data,
site.hbase-site.hbase.tmp.dir:
 ${AGENT_WORK_ROOT}/work/app/tmp,
site.hbase-site.hbase.master.info.port:
  ${HBASE_MASTER.ALLOCATED_PORT},
site.hbase-site.hbase.regionserver.port: 0,
site.hbase-site.zookeeper.znode.parent: ${DEF_ZK_PATH},
 
site.core-site.fs.defaultFS: ${NN_URI},
site.hdfs-site.dfs.namenode.https-address: ${NN_HOST}:50470,
site.hdfs-site.dfs.namenode.http-address: ${NN_HOST}:50070
}
  }
 
 
  I can't get config_types in agent python script anymore. Only global
  properties are set and caught
 
 
 
  Thanks!
 
 
 

 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.



How to try 1.7?

2014-11-17 Thread hsy...@gmail.com
Hi guys,

Is there an easy way to try 1.7, probably with HDP2.2?

Thanks!


Do I have to repackage if I change appConfig.json/resources.json

2014-11-14 Thread hsy...@gmail.com
Everytime I change appConfig.json and resources.json. Do I have to
repackage the zip file and redeploy the file to hdfs?

Thanks!


Re: Do I have to repackage if I change appConfig.json/resources.json

2014-11-14 Thread hsy...@gmail.com
So actually you don't need appConfig.json and resources.json in the ZIP
package?

On Fri, Nov 14, 2014 at 12:05 PM, Jon Maron jma...@hortonworks.com wrote:

 Perhaps I’m misunderstanding your question, but in general, if making
 modifications to those files, you can simply create a new application
 instance referencing the new versions of the file from the command line:

 ./slider create app name —template appConfig path —resources
 resources file path

 the app config references the application package in HDFS, which can be
 pre-seeded using “slider install-package”

 — Jon

 On Nov 14, 2014, at 2:54 PM, hsy...@gmail.com wrote:

  Everytime I change appConfig.json and resources.json. Do I have to
  repackage the zip file and redeploy the file to hdfs?
 
  Thanks!


 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.



Re: Do I have to repackage if I change appConfig.json/resources.json

2014-11-14 Thread hsy...@gmail.com
I'm getting confused. After you edit your appConfig.json and
resources.json. You don't put it back in your package zip file? So the
appConfig-default.json and resource-default.json is outdated?


Another question is actually I'm asking how to change the configuration
without recreate the cluster. Sometime I just want to increase the memory/
add more instances for example and I want the application recover from last
snapshot(run on same machines it allocated before)

Thank you!




On Fri, Nov 14, 2014 at 2:14 PM, Sumit Mohanty smoha...@hortonworks.com
wrote:

 Nope. This is what I do typically.

 * Get an application package zip file
 * Extract the appConfig-default.json  and resources-default.json from the
 package
 * Rename the files to appConfig.json  and resources.json respectively and
 edit as needed
 * Use install-package to upload the application package zip file
 * Call create with  --template appConfig.json --resources
 resources.json ...

 -Sumit

 On Fri, Nov 14, 2014 at 2:08 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  So actually you don't need appConfig.json and resources.json in the ZIP
  package?
 
  On Fri, Nov 14, 2014 at 12:05 PM, Jon Maron jma...@hortonworks.com
  wrote:
 
   Perhaps I’m misunderstanding your question, but in general, if making
   modifications to those files, you can simply create a new application
   instance referencing the new versions of the file from the command
 line:
  
   ./slider create app name —template appConfig path —resources
   resources file path
  
   the app config references the application package in HDFS, which can be
   pre-seeded using “slider install-package”
  
   — Jon
  
   On Nov 14, 2014, at 2:54 PM, hsy...@gmail.com wrote:
  
Everytime I change appConfig.json and resources.json. Do I have to
repackage the zip file and redeploy the file to hdfs?
   
Thanks!
  
  
   --
   CONFIDENTIALITY NOTICE
   NOTICE: This message is intended for the use of the individual or
 entity
  to
   which it is addressed and may contain information that is confidential,
   privileged and exempt from disclosure under applicable law. If the
 reader
   of this message is not the intended recipient, you are hereby notified
  that
   any printing, copying, dissemination, distribution, disclosure or
   forwarding of this communication is strictly prohibited. If you have
   received this communication in error, please contact the sender
  immediately
   and delete it from your system. Thank You.
  
 

 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.



How to change log level for Slider AM?

2014-11-14 Thread hsy...@gmail.com
How to change log level for Slider AM?

Thanks!


Re: Do I have to repackage if I change appConfig.json/resources.json

2014-11-14 Thread hsy...@gmail.com
I'm actually using 0.40

On Fri, Nov 14, 2014 at 4:21 PM, Sumit Mohanty smoha...@hortonworks.com
wrote:

  I'm getting confused. After you edit your appConfig.json and
 resources.json.
 You don't put it back in your package zip file? So the
 appConfig-default.json
 and resource-default.json is outdated?

 The file in the .zip package is not used by the running application. The
 files that are used are the ones that are provided by command line when
 create is called. *We suggest adding sample defaults to the .zip file as
 reference so that there are only one file to download for the users.*

 Which branch are you using? (for me to answer the next questions)

 On Fri, Nov 14, 2014 at 3:47 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  I'm getting confused. After you edit your appConfig.json and
  resources.json. You don't put it back in your package zip file? So the
  appConfig-default.json and resource-default.json is outdated?
 
 
  Another question is actually I'm asking how to change the configuration
  without recreate the cluster. Sometime I just want to increase the
 memory/
  add more instances for example and I want the application recover from
 last
  snapshot(run on same machines it allocated before)
 
  Thank you!
 
 
 
 
  On Fri, Nov 14, 2014 at 2:14 PM, Sumit Mohanty smoha...@hortonworks.com
 
  wrote:
 
   Nope. This is what I do typically.
  
   * Get an application package zip file
   * Extract the appConfig-default.json  and resources-default.json from
 the
   package
   * Rename the files to appConfig.json  and resources.json respectively
 and
   edit as needed
   * Use install-package to upload the application package zip file
   * Call create with  --template appConfig.json --resources
   resources.json ...
  
   -Sumit
  
   On Fri, Nov 14, 2014 at 2:08 PM, hsy...@gmail.com hsy...@gmail.com
   wrote:
  
So actually you don't need appConfig.json and resources.json in the
 ZIP
package?
   
On Fri, Nov 14, 2014 at 12:05 PM, Jon Maron jma...@hortonworks.com
wrote:
   
 Perhaps I’m misunderstanding your question, but in general, if
 making
 modifications to those files, you can simply create a new
 application
 instance referencing the new versions of the file from the command
   line:

 ./slider create app name —template appConfig path —resources
 resources file path

 the app config references the application package in HDFS, which
 can
  be
 pre-seeded using “slider install-package”

 — Jon

 On Nov 14, 2014, at 2:54 PM, hsy...@gmail.com wrote:

  Everytime I change appConfig.json and resources.json. Do I have
 to
  repackage the zip file and redeploy the file to hdfs?
 
  Thanks!


 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or
   entity
to
 which it is addressed and may contain information that is
  confidential,
 privileged and exempt from disclosure under applicable law. If the
   reader
 of this message is not the intended recipient, you are hereby
  notified
that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you
 have
 received this communication in error, please contact the sender
immediately
 and delete it from your system. Thank You.

   
  
   --
   CONFIDENTIALITY NOTICE
   NOTICE: This message is intended for the use of the individual or
 entity
  to
   which it is addressed and may contain information that is confidential,
   privileged and exempt from disclosure under applicable law. If the
 reader
   of this message is not the intended recipient, you are hereby notified
  that
   any printing, copying, dissemination, distribution, disclosure or
   forwarding of this communication is strictly prohibited. If you have
   received this communication in error, please contact the sender
  immediately
   and delete it from your system. Thank You.
  
 

 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.



Question about behavior after container failures

2014-11-14 Thread hsy...@gmail.com
Hi all,

In the document, I got that slider will try to recover on container
failure. But in my test application it doesn't

I'm using the 0.40 release. build from source

Here is what I get.

*If I kill the child process *
The agent check_process_status method will raise ComponentIsNotRunning and
never get back again

2014-11-14 16:18:40,274 - Error while executing command 'status':
Traceback (most recent call last):
  File 
/yarn/nm/usercache/vagrant/appcache/application_1415305968048_0008/container_1415305968048_0008_01_02/infra/agent/slider-agent/resource_management/libraries/script/script.py,
line 114, in execute
method(env)
  File 
/yarn/nm/usercache/vagrant/appcache/application_1415305968048_0008/container_1415305968048_0008_01_02/app/definition/package/scripts/kafka.py,
line 60, in status
check_process_status(status_params.pid_file)
  File 
/yarn/nm/usercache/vagrant/appcache/application_1415305968048_0008/container_1415305968048_0008_01_02/infra/agent/slider-agent/resource_management/libraries/functions/check_process_status.py,
line 45, in check_process_status
raise ComponentIsNotRunning()
ComponentIsNotRunning


*If I kill the agent process*, it's just silently gone



Best,

Siyuan


Re: Do I have to repackage if I change appConfig.json/resources.json

2014-11-14 Thread hsy...@gmail.com
The only problem is I have to upgrade to hadoop 2.6?

On Fri, Nov 14, 2014 at 5:08 PM, Sumit Mohanty smoha...@hortonworks.com
wrote:

 Is it possible for you to pick up the latest RC (0.60)? I can help you
 update the existing application package, if needed.


 Meanwhile, for 0.40, the log4j.properties file in
 slider-client-install-dir/conf can be modified to up the log level and the
 create call will push the file as a localized file for the AppMaster.

  Another question is actually I'm asking how to change the
 configuration without
 recreate the cluster. Sometime I just want to increase the memory/add more
 instances for example and I want the application recover from last
 snapshot(run
 on same machines it allocated before)

  increase the memory
 This is not possible when the app is running. Yarn does have a support for
 that. You need to stop the application and then modify the values. After
 topping the application you need to edit the resources.json file in HDFS
 to modify the value
 (e.g. ./user/USER1/slider/cluster/CLUSTER1/resources.json).

  add more instances for example

 Flex command can be used while the application is running.

  want the application recover from last snapshot
 If you stop and then start (probably it is freeze and thaw in 0.40) the
 application then it will use the hostnames that it remembered.

 -Sumit


 On Fri, Nov 14, 2014 at 4:35 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  I'm actually using 0.40
 
  On Fri, Nov 14, 2014 at 4:21 PM, Sumit Mohanty smoha...@hortonworks.com
 
  wrote:
 
I'm getting confused. After you edit your appConfig.json and
   resources.json.
   You don't put it back in your package zip file? So the
   appConfig-default.json
   and resource-default.json is outdated?
  
   The file in the .zip package is not used by the running application.
 The
   files that are used are the ones that are provided by command line when
   create is called. *We suggest adding sample defaults to the .zip file
  as
   reference so that there are only one file to download for the users.*
  
   Which branch are you using? (for me to answer the next questions)
  
   On Fri, Nov 14, 2014 at 3:47 PM, hsy...@gmail.com hsy...@gmail.com
   wrote:
  
I'm getting confused. After you edit your appConfig.json and
resources.json. You don't put it back in your package zip file? So
 the
appConfig-default.json and resource-default.json is outdated?
   
   
Another question is actually I'm asking how to change the
 configuration
without recreate the cluster. Sometime I just want to increase the
   memory/
add more instances for example and I want the application recover
 from
   last
snapshot(run on same machines it allocated before)
   
Thank you!
   
   
   
   
On Fri, Nov 14, 2014 at 2:14 PM, Sumit Mohanty 
  smoha...@hortonworks.com
   
wrote:
   
 Nope. This is what I do typically.

 * Get an application package zip file
 * Extract the appConfig-default.json  and resources-default.json
 from
   the
 package
 * Rename the files to appConfig.json  and resources.json
 respectively
   and
 edit as needed
 * Use install-package to upload the application package zip file
 * Call create with  --template appConfig.json --resources
 resources.json ...

 -Sumit

 On Fri, Nov 14, 2014 at 2:08 PM, hsy...@gmail.com 
 hsy...@gmail.com
 wrote:

  So actually you don't need appConfig.json and resources.json in
 the
   ZIP
  package?
 
  On Fri, Nov 14, 2014 at 12:05 PM, Jon Maron 
  jma...@hortonworks.com
  wrote:
 
   Perhaps I’m misunderstanding your question, but in general, if
   making
   modifications to those files, you can simply create a new
   application
   instance referencing the new versions of the file from the
  command
 line:
  
   ./slider create app name —template appConfig path
 —resources
   resources file path
  
   the app config references the application package in HDFS,
 which
   can
be
   pre-seeded using “slider install-package”
  
   — Jon
  
   On Nov 14, 2014, at 2:54 PM, hsy...@gmail.com wrote:
  
Everytime I change appConfig.json and resources.json. Do I
 have
   to
repackage the zip file and redeploy the file to hdfs?
   
Thanks!
  
  
   --
   CONFIDENTIALITY NOTICE
   NOTICE: This message is intended for the use of the individual
 or
 entity
  to
   which it is addressed and may contain information that is
confidential,
   privileged and exempt from disclosure under applicable law. If
  the
 reader
   of this message is not the intended recipient, you are hereby
notified
  that
   any printing, copying, dissemination, distribution, disclosure
 or
   forwarding of this communication is strictly prohibited. If you
   have
   received this communication in error

Re: Do I have to repackage if I change appConfig.json/resources.json

2014-11-14 Thread hsy...@gmail.com
This is not possible when the app is running. Yarn does have a support
for
that. You need to stop the application and then modify the values. After
topping the application you need to edit the resources.json file in HDFS
to modify the value
(e.g. ./user/USER1/slider/cluster/CLUSTER1/resources.json).

This is fine. I don't need to change this at run time, but I don't want to
recreate either. Can I just stop, edit and start the application again with
new properties been picked up?

If you stop and then start (probably it is freeze and thaw in 0.40)
the
application then it will use the hostnames that it remembered.

What if I use *yarn  application -kill* to stop the application?

On Fri, Nov 14, 2014 at 5:09 PM, hsy...@gmail.com hsy...@gmail.com wrote:

 The only problem is I have to upgrade to hadoop 2.6?

 On Fri, Nov 14, 2014 at 5:08 PM, Sumit Mohanty smoha...@hortonworks.com
 wrote:

 Is it possible for you to pick up the latest RC (0.60)? I can help you
 update the existing application package, if needed.


 Meanwhile, for 0.40, the log4j.properties file in
 slider-client-install-dir/conf can be modified to up the log level and the
 create call will push the file as a localized file for the AppMaster.

  Another question is actually I'm asking how to change the
 configuration without
 recreate the cluster. Sometime I just want to increase the memory/add more
 instances for example and I want the application recover from last
 snapshot(run
 on same machines it allocated before)

  increase the memory
 This is not possible when the app is running. Yarn does have a support for
 that. You need to stop the application and then modify the values. After
 topping the application you need to edit the resources.json file in HDFS
 to modify the value
 (e.g. ./user/USER1/slider/cluster/CLUSTER1/resources.json).

  add more instances for example

 Flex command can be used while the application is running.

  want the application recover from last snapshot
 If you stop and then start (probably it is freeze and thaw in 0.40)
 the
 application then it will use the hostnames that it remembered.

 -Sumit


 On Fri, Nov 14, 2014 at 4:35 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  I'm actually using 0.40
 
  On Fri, Nov 14, 2014 at 4:21 PM, Sumit Mohanty 
 smoha...@hortonworks.com
  wrote:
 
I'm getting confused. After you edit your appConfig.json and
   resources.json.
   You don't put it back in your package zip file? So the
   appConfig-default.json
   and resource-default.json is outdated?
  
   The file in the .zip package is not used by the running application.
 The
   files that are used are the ones that are provided by command line
 when
   create is called. *We suggest adding sample defaults to the .zip
 file
  as
   reference so that there are only one file to download for the users.*
  
   Which branch are you using? (for me to answer the next questions)
  
   On Fri, Nov 14, 2014 at 3:47 PM, hsy...@gmail.com hsy...@gmail.com
   wrote:
  
I'm getting confused. After you edit your appConfig.json and
resources.json. You don't put it back in your package zip file? So
 the
appConfig-default.json and resource-default.json is outdated?
   
   
Another question is actually I'm asking how to change the
 configuration
without recreate the cluster. Sometime I just want to increase the
   memory/
add more instances for example and I want the application recover
 from
   last
snapshot(run on same machines it allocated before)
   
Thank you!
   
   
   
   
On Fri, Nov 14, 2014 at 2:14 PM, Sumit Mohanty 
  smoha...@hortonworks.com
   
wrote:
   
 Nope. This is what I do typically.

 * Get an application package zip file
 * Extract the appConfig-default.json  and resources-default.json
 from
   the
 package
 * Rename the files to appConfig.json  and resources.json
 respectively
   and
 edit as needed
 * Use install-package to upload the application package zip file
 * Call create with  --template appConfig.json --resources
 resources.json ...

 -Sumit

 On Fri, Nov 14, 2014 at 2:08 PM, hsy...@gmail.com 
 hsy...@gmail.com
 wrote:

  So actually you don't need appConfig.json and resources.json in
 the
   ZIP
  package?
 
  On Fri, Nov 14, 2014 at 12:05 PM, Jon Maron 
  jma...@hortonworks.com
  wrote:
 
   Perhaps I’m misunderstanding your question, but in general, if
   making
   modifications to those files, you can simply create a new
   application
   instance referencing the new versions of the file from the
  command
 line:
  
   ./slider create app name —template appConfig path
 —resources
   resources file path
  
   the app config references the application package in HDFS,
 which
   can
be
   pre-seeded using “slider install-package”
  
   — Jon
  
   On Nov 14, 2014, at 2:54 PM, hsy...@gmail.com wrote

Re: log4j dir?

2014-11-14 Thread hsy...@gmail.com
Anyone has any idea how do I config the log4j file dir?

On Thu, Nov 13, 2014 at 4:58 PM, hsy...@gmail.com hsy...@gmail.com wrote:

 Hi guys,

 Just notice kafka.logs.dir in log4j.properties doesn't take effect

 It's always set to *$base_dir/logs* in kafka-run-class.sh

 LOG_DIR=$base_dir/logs
 KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS

 Best,
 Siyuan



Re: log4j dir?

2014-11-14 Thread hsy...@gmail.com
I think there is no way to specify different log location without modifying
shell script.
The *kafka.**logs.dir* in log4j.properties file is misleading

On Fri, Nov 14, 2014 at 1:24 PM, Ben Drees ben.dr...@gmail.com wrote:

 Hi,

 I had trouble with this as well.  The version of Kafka I'm running insists
 on using 'kafka/logs', so I create a soft link from there to the desired
 destination directory:

 # kafka scripts hard-code the logs dir, so point that path to where we want
 the logs to be.
 ln -s $STREAM_BUFFER_LOGS_DIR kafka/logs

 -Ben


 On Fri, Nov 14, 2014 at 11:17 AM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  Anyone has any idea how do I config the log4j file dir?
 
  On Thu, Nov 13, 2014 at 4:58 PM, hsy...@gmail.com hsy...@gmail.com
  wrote:
 
   Hi guys,
  
   Just notice kafka.logs.dir in log4j.properties doesn't take effect
  
   It's always set to *$base_dir/logs* in kafka-run-class.sh
  
   LOG_DIR=$base_dir/logs
   KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS
  
   Best,
   Siyuan
  
 



Re: Q: How to define component configuration?

2014-11-13 Thread hsy...@gmail.com
https://issues.apache.org/jira/browse/SLIDER-648  created. Thanks!

On Tue, Nov 11, 2014 at 5:32 PM, Sumit Mohanty sumit.moha...@gmail.com
wrote:

 components: {
 COMPONENT1: {
   *mykey: myvalue*
 },

 This is not wired up in the AgentProviderService to send to the agents. So
 as a work-around you may have to use something like

 global: {
 COMPONENT1.mykey: myvaluep,

 Can you file a JIRA to add support for reading component specific configs
 from appConfig.json and make them available at the container? This seems to
 be a good feature to support.

 -Sumit

 On Tue, Nov 11, 2014 at 2:42 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  Thanks Steve, but I logged all the properties in params.py,  I couldn't
  find any key named *mykey*
 
 
  *config = Script.get_config()*
for key in config.keys():
  print key:  + key
for key in config['global'].keys():
  print key:  + key
 
  Best
 
  On Tue, Nov 11, 2014 at 5:04 AM, Steve Loughran ste...@hortonworks.com
  wrote:
 
   that should be it. What happens each component gets the properties of
  
   component-level union global-level
  
   that is, everything that is global, extended with anything that is at
 the
   component level. If a component overrides the global value, that
 override
   is picked up
  
   It's essentially a form of prototype-based programming, except only of
   properties, not methods:
   http://en.wikipedia.org/wiki/Prototype-based_programming
  
   On 11 November 2014 01:30, hsy...@gmail.com hsy...@gmail.com wrote:
  
Thanks Ted, but back to my first question, how can you define
 component
level property? in appConfig.json?
   
I tried to define like this :
   
   
{
  schema: http://example.org/specification/v2.0.0;,
  metadata: {
  },
  global: {
application.def: app-package-0.1.zip,
java_home: /usr/lib/jvm/java-7-oracle/,
package_list: files/app.tgz,
agent.conf: /user/siyuan/agent/conf/agent.ini,
   
site.global.app_user: siyuan,
site.global.app_root:
${AGENT_WORK_ROOT}/app/install/kafka_2.10-0.8.1.1,
site.global.app_install_dir: ${AGENT_WORK_ROOT}/app/install,
site.global.pid_file: ${AGENT_WORK_ROOT}/app/run/app.pid,
   
  },
  components: {
COMPONENT1: {
  *mykey: myvalue*
},
slider-appmaster: {
  jvm.heapsize: 256M
}
  }
}
   
Is it able to make the component1 read the value for *mykey* ?
   
   
Best,
Siyuan
   
On Mon, Nov 10, 2014 at 4:15 PM, Ted Yu yuzhih...@gmail.com wrote:
   
 To my knowledge, there is no direct support for this.

 You can create different components, each with corresponding
   properties.

 Cheers

 On Mon, Nov 10, 2014 at 4:13 PM, hsy...@gmail.com 
 hsy...@gmail.com
 wrote:

  If I want to have several instances of some component. But I want
  to
set
  some of the properties to different value for different
 instances.
   How
 can
  I do it?
 
  Thanks!
 
  Best,
  Siyuan
 
  On Mon, Nov 10, 2014 at 1:26 PM, hsy...@gmail.com 
  hsy...@gmail.com
  wrote:
 
   Hi guys,
  
   Is there an example of component configuration? Is there a way
 to
give
   different value to same property for different instances?
  
   Siyuan
  
 

   
  
   --
   CONFIDENTIALITY NOTICE
   NOTICE: This message is intended for the use of the individual or
 entity
  to
   which it is addressed and may contain information that is confidential,
   privileged and exempt from disclosure under applicable law. If the
 reader
   of this message is not the intended recipient, you are hereby notified
  that
   any printing, copying, dissemination, distribution, disclosure or
   forwarding of this communication is strictly prohibited. If you have
   received this communication in error, please contact the sender
  immediately
   and delete it from your system. Thank You.
  
 



 --
 thanks
 Sumit



log4j dir?

2014-11-13 Thread hsy...@gmail.com
Hi guys,

Just notice kafka.logs.dir in log4j.properties doesn't take effect

It's always set to *$base_dir/logs* in kafka-run-class.sh

LOG_DIR=$base_dir/logs
KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS

Best,
Siyuan


Re: Q: How to define component configuration?

2014-11-11 Thread hsy...@gmail.com
Thanks Steve, but I logged all the properties in params.py,  I couldn't
find any key named *mykey*


*config = Script.get_config()*
  for key in config.keys():
print key:  + key
  for key in config['global'].keys():
print key:  + key

Best

On Tue, Nov 11, 2014 at 5:04 AM, Steve Loughran ste...@hortonworks.com
wrote:

 that should be it. What happens each component gets the properties of

 component-level union global-level

 that is, everything that is global, extended with anything that is at the
 component level. If a component overrides the global value, that override
 is picked up

 It's essentially a form of prototype-based programming, except only of
 properties, not methods:
 http://en.wikipedia.org/wiki/Prototype-based_programming

 On 11 November 2014 01:30, hsy...@gmail.com hsy...@gmail.com wrote:

  Thanks Ted, but back to my first question, how can you define component
  level property? in appConfig.json?
 
  I tried to define like this :
 
 
  {
schema: http://example.org/specification/v2.0.0;,
metadata: {
},
global: {
  application.def: app-package-0.1.zip,
  java_home: /usr/lib/jvm/java-7-oracle/,
  package_list: files/app.tgz,
  agent.conf: /user/siyuan/agent/conf/agent.ini,
 
  site.global.app_user: siyuan,
  site.global.app_root:
  ${AGENT_WORK_ROOT}/app/install/kafka_2.10-0.8.1.1,
  site.global.app_install_dir: ${AGENT_WORK_ROOT}/app/install,
  site.global.pid_file: ${AGENT_WORK_ROOT}/app/run/app.pid,
 
},
components: {
  COMPONENT1: {
*mykey: myvalue*
  },
  slider-appmaster: {
jvm.heapsize: 256M
  }
}
  }
 
  Is it able to make the component1 read the value for *mykey* ?
 
 
  Best,
  Siyuan
 
  On Mon, Nov 10, 2014 at 4:15 PM, Ted Yu yuzhih...@gmail.com wrote:
 
   To my knowledge, there is no direct support for this.
  
   You can create different components, each with corresponding
 properties.
  
   Cheers
  
   On Mon, Nov 10, 2014 at 4:13 PM, hsy...@gmail.com hsy...@gmail.com
   wrote:
  
If I want to have several instances of some component. But I want to
  set
some of the properties to different value for different instances.
 How
   can
I do it?
   
Thanks!
   
Best,
Siyuan
   
On Mon, Nov 10, 2014 at 1:26 PM, hsy...@gmail.com hsy...@gmail.com
wrote:
   
 Hi guys,

 Is there an example of component configuration? Is there a way to
  give
 different value to same property for different instances?

 Siyuan

   
  
 

 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.



Q: How to define component configuration?

2014-11-10 Thread hsy...@gmail.com
Hi guys,

Is there an example of component configuration? Is there a way to give
different value to same property for different instances?

Siyuan


Re: Q: How to define component configuration?

2014-11-10 Thread hsy...@gmail.com
If I want to have several instances of some component. But I want to set
some of the properties to different value for different instances. How can
I do it?

Thanks!

Best,
Siyuan

On Mon, Nov 10, 2014 at 1:26 PM, hsy...@gmail.com hsy...@gmail.com wrote:

 Hi guys,

 Is there an example of component configuration? Is there a way to give
 different value to same property for different instances?

 Siyuan



Re: Q: How to define component configuration?

2014-11-10 Thread hsy...@gmail.com
Thanks Ted, but back to my first question, how can you define component
level property? in appConfig.json?

I tried to define like this :


{
  schema: http://example.org/specification/v2.0.0;,
  metadata: {
  },
  global: {
application.def: app-package-0.1.zip,
java_home: /usr/lib/jvm/java-7-oracle/,
package_list: files/app.tgz,
agent.conf: /user/siyuan/agent/conf/agent.ini,

site.global.app_user: siyuan,
site.global.app_root:
${AGENT_WORK_ROOT}/app/install/kafka_2.10-0.8.1.1,
site.global.app_install_dir: ${AGENT_WORK_ROOT}/app/install,
site.global.pid_file: ${AGENT_WORK_ROOT}/app/run/app.pid,

  },
  components: {
COMPONENT1: {
  *mykey: myvalue*
},
slider-appmaster: {
  jvm.heapsize: 256M
}
  }
}

Is it able to make the component1 read the value for *mykey* ?


Best,
Siyuan

On Mon, Nov 10, 2014 at 4:15 PM, Ted Yu yuzhih...@gmail.com wrote:

 To my knowledge, there is no direct support for this.

 You can create different components, each with corresponding properties.

 Cheers

 On Mon, Nov 10, 2014 at 4:13 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  If I want to have several instances of some component. But I want to set
  some of the properties to different value for different instances. How
 can
  I do it?
 
  Thanks!
 
  Best,
  Siyuan
 
  On Mon, Nov 10, 2014 at 1:26 PM, hsy...@gmail.com hsy...@gmail.com
  wrote:
 
   Hi guys,
  
   Is there an example of component configuration? Is there a way to give
   different value to same property for different instances?
  
   Siyuan
  
 



Is it able to make AM to restart from previous state?

2014-11-05 Thread hsy...@gmail.com
Hi guys,

I noticed in the code when a container fails it will try to relaunch from
the same node. My question is if I restart whole application(Ex. AM got
killed, or manually restart the app). Does slider try to launch all
containers from the nodes where it was running?

Thanks!

Best,
Siyuan


Re: Is it able to make AM to restart from previous state?

2014-11-05 Thread hsy...@gmail.com
Thanks Steve,

Is No 1  a new feature in YARN (Not released yet)?

And you mentioned slider saves the location in history files. What are the
history files and where is it stored? Is it in HDFS?

If the one of the previous machines is gone, will it try to get resource
from new labeled machine?

Thanks


On Wed, Nov 5, 2014 at 12:46 PM, Steve Loughran ste...@hortonworks.com
wrote:

 On 5 November 2014 20:21, hsy...@gmail.com hsy...@gmail.com wrote:

  Hi guys,
 
  I noticed in the code when a container fails it will try to relaunch from
  the same node. My question is if I restart whole application(Ex. AM got
  killed, or manually restart the app). Does slider try to launch all
  containers from the nodes where it was running?
 
 
 1. If the AM crashes then YARN will restart it. The containers will keep
 working. When the AM comes back up it will work out its state and all
 running containers will stay live. Any containers that were part way
 through starting will be released and new ones requested (there's no record
 of what state they were in, so a clean destroy is simpler)


 If you stop/start the app then it asks for the nodes back on the same
 machines they were on. It saves the locations (look in the history subdir)
 to see the history files.

 Slider tries to read the last entry, going back to previous ones if the
 last one doesn't load. It then asks YARN for containers on those machines.
 There's no guarantee you get them though.

 Looking at the history code last week I noticed one little quirk: it
 doesn't reload the histories if the number of component types has
 increased. It just indexes the entries; more entries means it doesn't know
 how to handle them.

 To avoid this problem define all your components from the outset, setting
 the instances count 0 for ones you don't currently want

 Thanks!
 
  Best,
  Siyuan
 

 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.



Re: Is it able to make AM to restart from previous state?

2014-11-05 Thread hsy...@gmail.com
Steve,

I found out from the code that everything is kept in history folder in
hdfs.

You mentioned that if I add new component, the history layout would be
discarded. What if I add more component instances in configuration? Do you
try to launch instance from previous node and add new instance from new
node?
What if you decrease the instance number?

Thanks!

Best,
Siyuan

On Wed, Nov 5, 2014 at 1:44 PM, hsy...@gmail.com hsy...@gmail.com wrote:

 Thanks Steve,

 Is No 1  a new feature in YARN (Not released yet)?

 And you mentioned slider saves the location in history files. What are the
 history files and where is it stored? Is it in HDFS?

 If the one of the previous machines is gone, will it try to get resource
 from new labeled machine?

 Thanks


 On Wed, Nov 5, 2014 at 12:46 PM, Steve Loughran ste...@hortonworks.com
 wrote:

 On 5 November 2014 20:21, hsy...@gmail.com hsy...@gmail.com wrote:

  Hi guys,
 
  I noticed in the code when a container fails it will try to relaunch
 from
  the same node. My question is if I restart whole application(Ex. AM got
  killed, or manually restart the app). Does slider try to launch all
  containers from the nodes where it was running?
 
 
 1. If the AM crashes then YARN will restart it. The containers will keep
 working. When the AM comes back up it will work out its state and all
 running containers will stay live. Any containers that were part way
 through starting will be released and new ones requested (there's no
 record
 of what state they were in, so a clean destroy is simpler)


 If you stop/start the app then it asks for the nodes back on the same
 machines they were on. It saves the locations (look in the history subdir)
 to see the history files.

 Slider tries to read the last entry, going back to previous ones if the
 last one doesn't load. It then asks YARN for containers on those machines.
 There's no guarantee you get them though.

 Looking at the history code last week I noticed one little quirk: it
 doesn't reload the histories if the number of component types has
 increased. It just indexes the entries; more entries means it doesn't know
 how to handle them.

 To avoid this problem define all your components from the outset, setting
 the instances count 0 for ones you don't currently want

 Thanks!
 
  Best,
  Siyuan
 

 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity
 to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified
 that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender
 immediately
 and delete it from your system. Thank You.





Maven repository?

2014-11-05 Thread hsy...@gmail.com
Is there a public maven repository that I can checkout the slider library?

Best,
Siyuan


How to inject my own web service handler into app master?

2014-10-30 Thread hsy...@gmail.com
Hi,

I noticed slider app master has an embedded web service server. Is there a
way to inject my own web service call into that server?

Another question how to you communication between containers(components)
and AM(through agent code)?  Is there a way to customize the
communication/data between container and AM?

Is there a detail document about the agent API?

Best,
Siyuan


Re: Why command script is written in python?

2014-10-30 Thread hsy...@gmail.com
Thanks Sumit,

The only concern about script language is you don't know the compile error
until runtime. And in fact it's not arbitrary script it depends on the
library in slider for example
/python/resource_management/core/resources/system.py

Best,
Siyuan

On Thu, Oct 30, 2014 at 2:24 PM, Sumit Mohanty sumit.moha...@gmail.com
wrote:

 Actually, you can debug it after deploying it once. This will require
 setting some YARN variables to ensure containers are not released even if
 the application fails. The agent logs report back the commands being
 invoked and the parameters used to invoke them. Let me try to write that up
 on the wiki. *I will need a day or so.*

 The choice of python was mostly a reflection of going for scripting
 language that works on Windows and linux. As such it is possible to use any
 scripting language as long as the agent code is modified to handle various
 script types. Of course, today the only supported/tested one is python.

 On Thu, Oct 30, 2014 at 1:55 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  May I ask why do you choose python to write the command script. And is
  there  any other language? I find difficult to debug because each time
 if I
  find an issue I have to change file package upload to hdfs, rerun and I'm
  not very familiar with python
 
 
  Best
  Siyuan
 



 --
 thanks
 Sumit



Re: Why command script is written in python?

2014-10-30 Thread hsy...@gmail.com
Logged a ticket here, https://issues.apache.org/jira/browse/SLIDER-590
feel free to rephrase if anything I said is wrong. Thanks!

Best

On Thu, Oct 30, 2014 at 2:45 PM, Sumit Mohanty smoha...@hortonworks.com
wrote:

 Agree. In fact, we need to also document how Slider scripts can be debugged
 during development.

 If you do not mind can you open a JIRA at
 https://issues.apache.org/jira/browse/SLIDER describing the requirements
 of
 debugging during development time and post development. I will use that
 JIRA to drive the issue.

 -Sumit

 On Thu, Oct 30, 2014 at 2:40 PM, hsy...@gmail.com hsy...@gmail.com
 wrote:

  Thanks Sumit,
 
  The only concern about script language is you don't know the compile
 error
  until runtime. And in fact it's not arbitrary script it depends on the
  library in slider for example
  /python/resource_management/core/resources/system.py
 
  Best,
  Siyuan
 
  On Thu, Oct 30, 2014 at 2:24 PM, Sumit Mohanty sumit.moha...@gmail.com
  wrote:
 
   Actually, you can debug it after deploying it once. This will require
   setting some YARN variables to ensure containers are not released even
 if
   the application fails. The agent logs report back the commands being
   invoked and the parameters used to invoke them. Let me try to write
 that
  up
   on the wiki. *I will need a day or so.*
  
   The choice of python was mostly a reflection of going for scripting
   language that works on Windows and linux. As such it is possible to use
  any
   scripting language as long as the agent code is modified to handle
  various
   script types. Of course, today the only supported/tested one is python.
  
   On Thu, Oct 30, 2014 at 1:55 PM, hsy...@gmail.com hsy...@gmail.com
   wrote:
  
May I ask why do you choose python to write the command script. And
 is
there  any other language? I find difficult to debug because each
 time
   if I
find an issue I have to change file package upload to hdfs, rerun and
  I'm
not very familiar with python
   
   
Best
Siyuan
   
  
  
  
   --
   thanks
   Sumit
  
 

 --
 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.



  1   2   >