# Implement K-Way Merge with std::ranges

## Intro

Merging sorted ranges into one sorted range is a common task.
In the standard library, we have `std::ranges::merge`

:

```
// Merges two sorted ranges r1, r2 into one sorted range beginning at result.
template< ranges::input_range R1, ranges::input_range R2,
class O /*, ... */ >
constexpr /* ... */ merge( R1&& r1, R2&& r2, O result /*, ... */ );
```

But it’s for merging exactly 2 ranges.
The general form of the problem is: given *k* sorted ranges, merge them into one sorted range.

This is known as k-way merge. Let’s implement it.

## API Design

First of all, what should our API look like? There isn’t a precedent function in the standard `<algorithm>`

library that takes multiple ranges as input. They take at most two (and usually one) ranges.

But even before that, we need to ask: do we know `k`

at compile time? If so, the API can look like:

```
template< ranges::input_range ... Rs,
class O>
constexpr /* ... */ k_merge( Rs&& ... rs, O result);
// usage:
// k_merge(r1, r2, out);
// k_merge(r1, r2, r3, out);
```

Otherwise (`k`

only known at runtime), we have to introduce “range of ranges”:

```
template< ranges::input_range Rs,
class O>
constexpr /* ... */ k_merge( Rs&& rs, O result);
// usage:
// k_merge({r1, r2}, out);
// k_merge({r1, r2, r3}, out);
```

It turns out they are two very different problems. Today we’ll only talk about the runtime case as it is more general and easier to deal with - all input ranges are of the same type.

To keep it simple, we’ll simply return `result`

(like `std::ranges`

), and leaving the custom comparison function out:

```
template< ranges::input_range Rs,
class O>
constexpr O k_merge( Rs&& rs, O result);
```

## Normalize

The first problem is, the `input_range`

is too limited.
Remember that `input_range`

means it’s only good for one-pass algorithms; but even the “scan the minimum” algorithm is multi-pass: it needs to visit each element (input sorted range) multiple times. The only single-pass way is brute-force: copy all elements into a big array, sort that array, and output. Clearly that’s not efficient.

We can change our API from `input_range`

to, for example, `forward_range`

. But that would make it less functional, and even then, `forward_range`

is not enough for the more efficient heap-based algorithm which requires `random_access_range`

. And even we go all in, the input ranges may be non-modifiable, so we can’t really `pop_front()`

each input range of `rs`

in-place.

The answer is: copy the iterators into a more powerful container, say `std::vector`

, and operate on that instead. For each input range, we turn it into pair of `begin()`

and `end()`

. So we need a type holds two possibly-different-type values.

We could use `std::pair`

, but there is a better option: `std::ranges::subrange`

. It comes with handy member functions including `advance()`

that essentially drops the first element(s).

Therefore, the first part of our function is to *normalize* the inputs:

```
template< std::ranges::input_range Rs,
class O>
constexpr O k_merge( Rs&& rs, O result) {
using range_t = ranges::range_value_t<Rs>; // input sorted range
using subrange_t = ranges::subrange< // our cheap-to-copy view of the input range
ranges::iterator_t<range_t>,
ranges::sentinel_t<range_t>
>;
std::vector<subrange_t> sub_rs;
for (auto&& r: rs) {
subrange_t sub_r{r};
if (!sub_r.empty()) {
sub_rs.push_back(std::move(sub_r));
}
}
// ...
}
```

Here we only keep the non-empty ranges. We’ll see why.

## Linear Scan

Let’s implement the straightforward approach - scan the minimum of `k`

ranges each iteration.

We first need a comparison function to compare two input ranges.

```
auto cmp_rg = [](const subrange_t& lhs, const subrange_t& rhs) {
return *lhs.begin() < *rhs.begin();
};
```

Had we not dropped the non-empty ranges, we would have to check for emptiness here.

Next, the main loop:

```
while (!sub_rs.empty()) {
auto it_min = ranges::min_element(sub_rs, cmp_rg);
auto& min_rg = *it_min;
*result++ = *min_rg.begin();
min_rg.advance(1);
if (min_rg.empty()) {
// maintain the invariance that `sub_rs` holds non-empty ranges
sub_rs.erase(it_min);
}
}
```

The time complexity is `O(n * k)`

where `n`

is the total number of elements.

## Heap

When `k`

is large, it is expensive to do `min_element`

on each loop. Instead we can maintain a heap of the minimums:

```
auto cmp_rg = [](const subrange_t& lhs, const subrange_t& rhs) {
// std::make_heap produces a max heap by default, so we need to revert the side here.
return *rhs.begin() < *lhs.begin();
};
std::ranges::make_heap(rs, cmp_rg);
while (!sub_rs.empty()) {
std::ranges::pop_heap(rs, cmp_rg);
auto& min_rg = rs.back();
*result++ = *min_rg.begin();
min_rg.advance(1);
if (min_rg.empty()) {
rs.pop_back();
} else {
std::ranges::push_heap(rs, cmp_rg);
}
}
```

This reduces the time complexity to `O(n * log(k))`

.

## Make it Stable

Note that there is one minor problem of the above heap-based algorithm - it isn’t stable.
Here, being stable means that if two elements from two different input ranges compare equal,
the one whose input range appears first in `rs`

will appear first in the output.

The fix is to introduce a unique index for each input range to serve as the tie breaker:

```
struct index_subrange_t : public subrange_t {
std::size_t index{}; // tie-breaker
// ... add the necessary constructors ...
};
std::vector<index_subrange_t> sub_rs;
std::size_t i = 0;
for (auto&& r: rs) {
index_subrange_t sub_r{r};
if (!sub_r.empty()) {
sub_r.index = i++;
sub_rs.push_back(std::move(sub_r));
}
}
```

and modify the comparison function:

```
auto cmp_rg = [](const index_subrange_t& lhs, const index_subrange_t& rhs) {
const auto& top1 = *lhs.begin();
const auto& top2 = *rhs.begin();
if (top2 < top1) return true;
else if (top1 < top2) return false;
else return rhs.index < lhs.index;
};
```

## Algorithm selection

While heap-based algorithms give the best time complexity, they aren’t necessarily the fastest when `k`

isn’t big.
This is because moving items around the heap, while cheap, isn’t free.

What we can do it to choose the algorithm based on `k`

: only employ the heap algorithm when `k`

reaches some predefined threshold.

In fact, while we are at it:

- If
`k == 0`

, call it a day! - Else If
`k == 1`

, simply`std::ranges::copy`

to output - Else If
`k == 2`

, use`std::ranges::merge`

- Else If
`k < THRESHOLD`

, use linear scanning - Else, use heap

In addition, we won’t just select algorithm one-off at the start; we keep re-evaluating whether we should switch algo each time `k`

changes - decreases by 1 when an input range becomes empty and is removed from `sub_rs`

.

In fact, if you check the actual implementations (libstdc++, libc++) of `std::merge`

, you’ll see that they fallback to `std::copy`

whenever an input range becomes empty. Essentially the same idea.