Igor's Techno Club

Case Study: Aggregating a Billion Rows File in Under 6 Seconds

Let's examine one of the submissions to the 1BRC problem and see how the problem was approached and which programming concepts were used to solve it.

Problem statement

Given a file containing temperature records in the following format:

Hamburg;12.0
Bulawayo;8.9
Palembang;38.8
St. John's;15.2
Cracow;12.6
Bridgetown;26.9
Istanbul;6.2
Roseau;34.4
Conakry;31.2
Istanbul;23.0

the task is to process a file containing 1 billion records as fast as possible and calculate the min, max, and mean values for every city.

Output example:

Abha=-23.0/18.0/59.2

Solution

The reason I chose this as an example is because it's short, easy to understand, and fast.

Let's start from the high-level design of this approach.

Design approach

Data processing is done in parallel, where each thread will work on its chunk of data, and intermediate results are stored in a hash-table. When all threads are done processing, the results from the talbe will be. merged together and printed out. Let's dig deeper into some of the aforementioned ideas.

Work parallelization

Since each of us has a multicore processing unit, it always makes sense to utilize them efficiently. Not every task could be split and done in paraller, but data aggregation is one of the examples when it's possible. In the following example, we split a file into file segments and work on them in parallel.

 var resultsMap = getFileSegments(file).stream().map(segmentProcessing()).parallel().flatMap(partition -> partition.getAll().stream())
                .collect(Collectors.toMap(e -> new String(e.key()), Entry::value, CalculateAverage_spullara::merge, TreeMap::new));

Splitting a file into segments

The number of file segments equals the number of available CPU cores, and each segment size is calculated based on the file length. Note that getting the size of the file is a cheap operation, so we use it to our advantage here.

int numberOfSegments = Runtime.getRuntime().availableProcessors();
        long fileSize = file.length();
        long segmentSize = fileSize / numberOfSegments;
        List<FileSegment> segments = new ArrayList<>(numberOfSegments);

Once we have a file segment and its size, we need to read the content in the most effective way, which is random access. In Java, it could be achieved by this code:

RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")

The only tricky thing about file segments is that the file could be split into this:

Hamburg;12.0
Bulawayo;8.9
Palem

and this

bang;38.8
St. John's;15.2

To avoid such incomplete segments, we will be adjusting segStart and segEnd until we meet the \n symbol:

if (i != skipSegment) {
            raf.seek(location);
            while (location < fileSize) {
                location++;
                if (raf.read() == '\n')
                    break;
            }
        }

Mapping file to memory

Once we have a file segment representing a portion of its content, we map this portion directly to RAM to speed up further processing of that segment, especially for larger files:

try (FileChannel fileChannel = (FileChannel) Files.newByteChannel(Paths.get(filename), StandardOpenOption.READ)) {
                        MappedByteBuffer bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start, segmentEnd - segment.start);

Once file is mapped to memory we do regular reading byte by byte and extracting needed information. Once information is gathered we put it to the result map :

resultMap.putOrMerge(buffer, 0, offset, temp / 10.0, hash);

where buffer is a read line and hash is a hash code of the current city. The hash plays an important role here, but I will get to it a bit later. This is how the hash is [calculated] (https://stackoverflow.com/questions/299304/why-does-javas-hashcode-in-string-use-31-as-a-multiplier):

while (currentPosition != segmentEnd && (b = bb.get(currentPosition++)) != ';') {
                                buffer[offset++] = b;
                                hash = 31 * hash + b;
                            }

Storing segment's results

ByteArrayToResultMap is a class created specifically for storing results for each file segment.

Internally, it has an array of Result objects Result[] slots = new Result[MAPSIZE]; where the array index is a city's slot id, and the id is calculated by a bitwise operation: int slot = hash & (slots.length - 1); - this is the place where the calculated hash is needed.

Even though the length of the slots array is huge (1024 * 128 elements), we still may face a hash collision situation, which is handled by looking at the next empty slot until we find one:

while (slotValue != null && (keys[slot].length != size || !Arrays.equals(keys[slot], 0, size, key, offset, size))) {
            slot = (slot + 1) & (slots.length - 1);
            slotValue = slots[slot];
        }

where keys is a mapping between the city's slot id and its byte array representation of the string value.

Once we have an empty slot, we put the Result object, or if it has a previous value, we calculate new min, max count, and sum values:

if (value == null) {
            slots[slot] = new Result(temp);
            byte[] bytes = new byte[size];
            System.arraycopy(key, offset, bytes, 0, size);
            keys[slot] = bytes;
        } else {
            value.min = Math.min(value.min, temp);
            value.max = Math.max(value.max, temp);
            value.sum += temp;
            value.count += 1;
        }

Final step

After we process all file segments, we need to merge all of them together for the same city:

.collect(Collectors.toMap(e -> new String(e.key), (entry) -> entry.value, (e1, e2) -> merge((Result) e1, (Result) e2), TreeMap::new));

where entry.value is a Result object.

In the end, we put results into the TreeMap to have keys (city names) sorted. During printing the Result objects, we do the calculation on the mean value on the spot:

return round(min) + "/" + round(sum / count) + "/" + round(max);

Conclusion

Even though this code was written in Java, all concepts and ideas could be applied almost to every language. This case supports the idea that performant code can still be concise and elegant.

Studied by Igor

#1brc #java