How to process data from generator as it become available?

Consider the following Python code (3.6.1):

import shutil
import requests
import hashlib
import concurrent.futures as futures
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

urls = ['',

def download_file(url):
    filename = url.split('/')[-1]
    log(f'Starting download {url}')
    r = requests.get(url, stream=True)
    with open(filename, 'wb') as f:
        shutil.copyfileobj(r.raw, f)
    log(f'Finished download {url}')
    return filename

def sha256_file(filename, block_size=65536):
    sha256 = hashlib.sha256()
    log(f'Hashing file {filename}...')
    with open(filename, 'rb') as f:
        for block in iter(lambda:, b''):
    log(f'Finished hashing file {filename}')
    return filename, sha256.hexdigest()

def parallel(n, fn, data):
    with ThreadPoolExecutor(max_workers=n) as exe:
        jobs = (exe.submit(fn, d) for d in data)
        for job in futures.as_completed(jobs):
            yield job.result()

def log(msg):
    print(f'{} {msg}')

def main():
    files = parallel(3, download_file, urls)
    hashes = parallel(2, sha256_file, files)
    for f, h in hashes:
        log(f'File: {f} -> Hash: {h}')

if __name__ == '__main__':

The idea of ‘parallel’ generator is to create multi-threaded pipeline stages. The first stage (download_file) runs with 3 threads, and as soon as it gets each file, the second stage (sha256_file) process the file. This is the output generated:

2017-04-01 23:16:19.965946 Starting download
2017-04-01 23:16:19.966431 Starting download
2017-04-01 23:16:19.966931 Starting download
2017-04-01 23:16:20.683722 Finished download
2017-04-01 23:16:20.684722 Hashing file test100k.db...
2017-04-01 23:16:20.685256 Finished hashing file test100k.db
2017-04-01 23:16:23.600355 Finished download
2017-04-01 23:16:23.600873 Hashing file test1Mb.db...
2017-04-01 23:16:23.605420 Finished hashing file test1Mb.db
2017-04-01 23:16:49.962977 Finished download
2017-04-01 23:16:49.963463 Hashing file test10Mb.db...
2017-04-01 23:16:49.963975 File: test100k.db -> Hash: f627ca4c2c322f15db26152df306bd4f983f0146409b81a4341b9b340c365a16
2017-04-01 23:16:49.963975 File: test1Mb.db -> Hash: 30e14955ebf1352266dc2ff8067e68104607e750abb9d3b36582b8af909fcb58
2017-04-01 23:16:50.003094 Finished hashing file test10Mb.db
2017-04-01 23:16:50.003094 File: test10Mb.db -> Hash: e5b844cc57f57094ea4585e235f36c78c1cd222262bb89d53c94dcb4d6b3e55d

According to the logs, we can see that the stages one and two are working fine, but after the last stage, the for loop ‘for f, h in hashes’ only gets the results from the last stage after receiving all data. How to make it process the data as it become available?

