IObservable TakeLast (n) және бұғаттау

Мен бақыланатын 20 ең соңғы құндылықтарды алуға тырысамын және оны тосқауылсыз мүлік ретінде ашуға тырысамын. Қазіргі кезде менің кодым ұқсас:

class Foo
{
    private IObservable observable;

    public Foo(IObservable bar)
    {
        this.observable = bar;
    }

    public IEnumerable MostRecentBars
    {
        get 
        {
             return this.observable.TakeLast(20).ToEnumerable();
        }
    }
 }

Алайда, MostRecentBars getter шақырылғанда, бұл бұғатталады, себебі, кем дегенде, 20 мәндер болғанша ToEnumerable қайтарылмайды.

Блокталусыз бақыланатын ең соңғы 20 ең көп мәндерді шығаруға арналған кіріктірілген жол бар ма? Егер 20-дан аз бақыланатын құндылықтар болса, онда олардың барлығын қайтару керек.

1
IObservable ішіндегі TakeLast әдісі жоқ
қосылды автор ojlovecd, көзі

4 жауаптар

Мен сізге екі нұсқаны беремін. Бір Rx Scan операторын пайдаланады, бірақ менің ойымша, оны оқуға біраз қиынырақ етеді. Екіншісі бұғатталған стандартты Queue пайдаланады. Сіз таңдай аласыз.

(1)

class Foo
{
    private int[] bars = new int[] { };

    public Foo(IObservable bar)
    {
        bar
            .Scan(
                new int[] { },
                (ns, n) =>
                    ns
                        .Concat(new [] { n, })
                        .TakeLast(20)
                        .ToArray())
            .Subscribe(ns => bars = ns);
    }

    public IEnumerable MostRecentBars
    {
        get 
        {
            return bars;
        }
    }
}

(2)

class Foo
{
    private Queue queue = new Queue();

    public Foo(IObservable bar)
    {
        bar.Subscribe(n =>
        {
            lock (queue)
            {
                queue.Enqueue(n);
                if (queue.Count > 20)
                {
                    queue.Dequeue();
                }
            }
        });
    }

    public IEnumerable MostRecentBars
    {
        get 
        {
            lock (queue)
            {
                return queue.ToArray();
            }
        }
    }
}

Мен бұл көмекке сенемін.

2
қосылды
Енді Ильин Пинзонның Queue енгізуін әлдеқайда көп жазғанын көріп, мен бұл тәсілмен ең жақсы ставка ретінде барамын. Бұл шешімдеріміз қаншалықты жақын екенін таңқаларлық. Мен оны орналастырғаннан кейінгі кезге дейін оған қараған жоқпын. Дегенмен, Scan операторы жиі назардан тыс қалады, бірақ бұл көптеген жағдайларда өте пайдалы.
қосылды автор Enigmativity, көзі
Scan -ді қолданып, бірінші әдісті ұнатамын, ол «соңғы (ең көп) n өту» шегінен тыс бақыланатын тізбекті жалғастыруға мүмкіндік береді. Шын мәнінде, осы мақсат үшін кеңейтілім әдісіне айналдыру жақсы болар еді, яғни var recentAverageScore = scores.TakeUpToLast (25). Орташа (); немесе IObservable нәтижесі.
қосылды автор Joe Skeen, көзі

Менің реактивті кеңейтіммен салынған кез-келген жобаға қосылуға болатын бірнеше кеңейтім бар, олардың біреуі жылжымалы терезе болып табылады:

public static IObservable> SlidingWindow(this IObservable o, int length)
{
   Queue window = new Queue();

    return o.Scan>(new T[0], (a, b) =>
    {
        window.Enqueue(b);
        if (window.Count > length)
            window.Dequeue();
        return window.ToArray();
    });
}

Бұл соңғы N элементтерінің жиымын қайтарады (немесе әлі N элементтері болмаған жағдайда кем).

Сіздің ісіңіз үшін, сіз:

class Foo
{
    private IObservable observable;
    private int[] latestWindow = new int[0];

    IDisposable slidingWindowSubscription;

    public Foo(IObservable bar)
    {
        this.observable = bar;
        slidingWindowSubscription = this.observable.SlidingWindow(20).Subscribe(a =>
            {
                latestWindow = a;
            });
    }

    public IEnumerable MostRecentBars
    {
        get 
        {
             return latestWindow;
        }
    }
}
1
қосылды

Сіз өзіңіздің жауабыңызды алға тартсаңыз да, мен оны Replay Subject-ды аралықпен қолданып шешіп, келесідей нәрсеге келдім:

class Foo
{
    private ReplaySubject replay = new ReplaySubject(20);

    public Foo(IObservable bar)
    {
        bar.Subscribe(replay);
    }

    public IEnumerable MostRecentBars
    {
        get
        {
            var result = new List();
            replay.Subscribe(result.Add); //Replay fill in the list with buffered items on same thread
            return result;
        }
    }
}

Мәселеңізге сәйкес келетінін білемін.

0
қосылды

Мен сіздің талаптарыңызға сəйкес келетін кірістірілген Rx операторы туралы ойламаймын. Сіз оны келесідей түрде іске асыра аласыз:

class Foo
{
    private IObservable observable;
    private Queue buffer = new Queue();

    public Foo(IObservable bar)
    {
        this.observable = bar;

        this.observable
            .Subscribe(item =>
            {
                lock (buffer)
                {
                    if (buffer.Count == 20) buffer.Dequeue();
                    buffer.Enqueue(item);
                }
            });
    }

    public IEnumerable MostRecentBars
    {
        get
        {
            lock (buffer)
            { 
                return buffer.ToList();    //Create a copy.
            }
        }
    }
}
0
қосылды