How to zip 2 sequences based on property (zip, join) (2024)

2

I would like to zip the items of 2 sequences based on a common property similar to joining them when using enumerables. How can I make the second test pass?

using NUnit.Framework;using System;using System.Collections.Generic;using System.Linq;using System.Reactive.Linq;using System.Threading.Tasks;public class SequenceTests{ private class Entry { public Entry(DateTime timestamp, string value) { Timestamp = timestamp; Value = value; } public DateTime Timestamp { get; } public string Value { get; } } private readonly IEnumerable<Entry> Tasks = new List<Entry> { new Entry(new DateTime(2021, 6, 6), "Do homework"), new Entry(new DateTime(2021, 6, 7), "Buy groceries"), // <-- This date is also in the People collection! new Entry(new DateTime(2021, 6, 8), "Walk the dog"), }; private readonly IEnumerable<Entry> People = new List<Entry> { new Entry(new DateTime(2021, 6, 4), "Peter"), new Entry(new DateTime(2021, 6, 5), "Jane"), new Entry(new DateTime(2021, 6, 7), "Paul"), // <-- This date is also in the Tasks collection! new Entry(new DateTime(2021, 6, 9), "Mary"), }; private class Assignment { public string Task { get; set; } public string Person { get; set; } } [Test] public void Join_two_collections_should_succeed() { var assignments = Tasks .Join(People, task => task.Timestamp, person => person.Timestamp, (task, person) => new Assignment { Task = task.Value, Person = person.Value }); Assert.AreEqual(1, assignments.Count()); Assert.AreEqual("Buy groceries", assignments.First().Task); Assert.AreEqual("Paul", assignments.First().Person); } [Test] public async Task Zip_two_sequences_should_succeed() { var tasks = Observable.ToObservable(Tasks); var people = Observable.ToObservable(People); var sequence = tasks .Zip(people) .Select(pair => new Assignment { Task = pair.First.Value, Person = pair.Second.Value }); var assignments = await sequence.ToList(); Assert.AreEqual(1, assignments.Count); Assert.AreEqual("Buy groceries", assignments.First().Task); Assert.AreEqual("Paul", assignments.First().Person); }}

Answers

I don't like either of the posted answers. Both of them are variations on the same theme: Keep all members of both sequences in memory indefinitely and iterate over the entire right sequence whenever a new left element comes in, and incrementally check the left key whenever a new right element comes in. Both answers you O(L + R) memory indefinitely and are O(R * L) time complexity (where L and R are the sizes of the left and right sequences).

If we were dealing with collections (or enumerables), that would be a sufficient answer. But we're not: We're dealing with observables, and the answers should acknowledge that. There could be large time gaps in between the actual use case. The question is posed as a test case stemming from an enumerable. If it were simply an enumerable, the right answer is to convert back to Enumerable and use Linq's Join. If there's a possibility of a long running process with time gaps, the answer should acknowledge that you may want to only join on elements that have happened within some period of time, releasing memory in the process.

This satisfies the test answer, while allowing for a time box:

var sequence = tasks.Join(people, _ => Observable.Timer(TimeSpan.FromSeconds(.5)), _ => Observable.Timer(TimeSpan.FromSeconds(.5)), (t, p) => (task: t, person: p) ) .Where(t => t.person.Timestamp == t.task.Timestamp) .Select(t => new Assignment { Task = t.task.Value, Person = t.person.Value });

This creates a window for each element of .5 seconds, meaning a left element and right element will match if they pop out within .5 seconds of each other. After .5 seconds, each element is released from memory. If, for whatever reason, you didn't want to release from memory and hold all objects in memory indefinitely, this would suffice:

var sequence = tasks.Join(people, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), (t, p) => (task: t, person: p) ) .Where(t => t.person.Timestamp == t.task.Timestamp) .Select(t => new Assignment { Task = t.task.Value, Person = t.person.Value });

Here is a custom Join operator that could be used in order to solve this problem. It is based on the Merge, GroupByUntil and SelectMany operators:

/// <summary>/// Correlates the elements of two sequences based on matching keys. Results are/// produced for all combinations of correlated elements that have an overlapping/// duration./// </summary>public static IObservable<TResult> Join<TLeft, TRight, TKey, TResult>( this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, TKey> leftKeySelector, Func<TRight, TKey> rightKeySelector, Func<TLeft, TRight, TResult> resultSelector, TimeSpan? keyDuration = null, IEqualityComparer<TKey> keyComparer = null){ // Arguments validation omitted keyComparer ??= EqualityComparer<TKey>.Default; var groupDuration = keyDuration.HasValue ? Observable.Timer(keyDuration.Value) : Observable.Never<long>(); return left .Select(x => (x, (TRight)default, Type: 1, Key: leftKeySelector(x))) .Merge(right.Select(x => ((TLeft)default, x, Type: 2, Key: rightKeySelector(x)))) .GroupByUntil(e => e.Key, _ => groupDuration, keyComparer) .Select(g => ( g.Where(e => e.Type == 1).Select(e => e.Item1), g.Where(e => e.Type == 2).Select(e => e.Item2).Replay().AutoConnect(0) )) .SelectMany(g => g.Item1.SelectMany(_ => g.Item2, resultSelector));}

Usage example:

IObservable<Assignment> sequence = tasks .Join(people, t => t.Timestamp, p => p.Timestamp, (t, p) => new Assignment { Task = t.Value, Person = p.Value });

It should be noted that this problem cannot be solved with guaranteed 100% correctness without buffering all the elements that the two source sequences produce. Obviously this is not going to scale well in case the sequences contain infinite elements.

In case sacrificing the absolute correctness in favor of scalability is acceptable, the optional keyDuration argument can be used to configure the maximum duration that a stored key (and its associated elements) can be preserved in memory. An expired key can potentially be reborn, in case new elements having this key are produced by the left or right sequences.

The above implementation performs reasonably well with sequences containing large number of elements. Joining two same-sized sequences, each having 100,000 elements, takes ~8 seconds in my PC.

The observable Zip operator works just the same as the enumerable version. You didn't use that in the first test so it's not like to be the operator you need here.

What you need is simply the SelectMany operator.

Try this query:

var sequence = from t in tasks from p in people where t.Timestamp == p.Timestamp select new Assignment { Task = t.Value, Person = p.Value };

That works with your test.

related

How to skip if element does not exist in Selenium

0

I am trying to scrape the restaurant names of a food delivery website, and the xpaths are randomly numerical.Not all the numbers......

How to select part of complex vector in Matlab

0

This is probably a trivial question, but I want to select a portion of a complex array in order to plot it in Matlab. My MWE isn......

How do I get notified when an array property changes?

0

My class contain many properties and i need to handle each properties.See this below:public partial class my_form : Form{ pri......

How to use an array index as an operator?

0

How to use an array index as an operator?Such that:#include<stdio.h>main(){ char sign[]={'+','-'}; int a, b; a = ......

How to find the Index number of the array in C

0

Closed. This question does not meet Stack Overflow guidelines. It is not currently accepting answers. ......
How to zip 2 sequences based on property (zip, join) (2024)
Top Articles
Latest Posts
Article information

Author: Virgilio Hermann JD

Last Updated:

Views: 6127

Rating: 4 / 5 (41 voted)

Reviews: 80% of readers found this page helpful

Author information

Name: Virgilio Hermann JD

Birthday: 1997-12-21

Address: 6946 Schoen Cove, Sipesshire, MO 55944

Phone: +3763365785260

Job: Accounting Engineer

Hobby: Web surfing, Rafting, Dowsing, Stand-up comedy, Ghost hunting, Swimming, Amateur radio

Introduction: My name is Virgilio Hermann JD, I am a fine, gifted, beautiful, encouraging, kind, talented, zealous person who loves writing and wants to share my knowledge and understanding with you.