Howdy, Stranger!

It looks like you're new here. If you want to get involved, click one of these buttons!

Categories

How to process data from generator as it become available?

Alexandre VerriAlexandre Verri Member Posts: 1

Consider the following Python code (3.6.1):

import shutil
import requests
import hashlib
import concurrent.futures as futures
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

urls = ['http://speedtest.ftp.otenet.gr/files/test100k.db',
        'http://speedtest.ftp.otenet.gr/files/test1Mb.db',
        'http://speedtest.ftp.otenet.gr/files/test10Mb.db']


def download_file(url):
    filename = url.split('/')[-1]
    log(f'Starting download {url}')
    r = requests.get(url, stream=True)
    with open(filename, 'wb') as f:
        shutil.copyfileobj(r.raw, f)
    log(f'Finished download {url}')
    return filename


def sha256_file(filename, block_size=65536):
    sha256 = hashlib.sha256()
    log(f'Hashing file {filename}...')
    with open(filename, 'rb') as f:
        for block in iter(lambda: f.read(block_size), b''):
            sha256.update(block)
    log(f'Finished hashing file {filename}')
    return filename, sha256.hexdigest()


def parallel(n, fn, data):
    with ThreadPoolExecutor(max_workers=n) as exe:
        jobs = (exe.submit(fn, d) for d in data)
        for job in futures.as_completed(jobs):
            yield job.result()


def log(msg):
    print(f'{datetime.now()} {msg}')


def main():
    files = parallel(3, download_file, urls)
    hashes = parallel(2, sha256_file, files)
    for f, h in hashes:
        log(f'File: {f} -> Hash: {h}')


if __name__ == '__main__':
    main()

The idea of ‘parallel’ generator is to create multi-threaded pipeline stages. The first stage (download_file) runs with 3 threads, and as soon as it gets each file, the second stage (sha256_file) process the file. This is the output generated:

2017-04-01 23:16:19.965946 Starting download http://speedtest.ftp.otenet.gr/files/test100k.db
2017-04-01 23:16:19.966431 Starting download http://speedtest.ftp.otenet.gr/files/test1Mb.db
2017-04-01 23:16:19.966931 Starting download http://speedtest.ftp.otenet.gr/files/test10Mb.db
2017-04-01 23:16:20.683722 Finished download http://speedtest.ftp.otenet.gr/files/test100k.db
2017-04-01 23:16:20.684722 Hashing file test100k.db...
2017-04-01 23:16:20.685256 Finished hashing file test100k.db
2017-04-01 23:16:23.600355 Finished download http://speedtest.ftp.otenet.gr/files/test1Mb.db
2017-04-01 23:16:23.600873 Hashing file test1Mb.db...
2017-04-01 23:16:23.605420 Finished hashing file test1Mb.db
2017-04-01 23:16:49.962977 Finished download http://speedtest.ftp.otenet.gr/files/test10Mb.db
2017-04-01 23:16:49.963463 Hashing file test10Mb.db...
2017-04-01 23:16:49.963975 File: test100k.db -> Hash: f627ca4c2c322f15db26152df306bd4f983f0146409b81a4341b9b340c365a16
2017-04-01 23:16:49.963975 File: test1Mb.db -> Hash: 30e14955ebf1352266dc2ff8067e68104607e750abb9d3b36582b8af909fcb58
2017-04-01 23:16:50.003094 Finished hashing file test10Mb.db
2017-04-01 23:16:50.003094 File: test10Mb.db -> Hash: e5b844cc57f57094ea4585e235f36c78c1cd222262bb89d53c94dcb4d6b3e55d

According to the logs, we can see that the stages one and two are working fine, but after the last stage, the for loop ‘for f, h in hashes’ only gets the results from the last stage after receiving all data. How to make it process the data as it become available?

Tagged:

Comments

  • Himanshu MishraHimanshu Mishra Member Posts: 11

    Hey guys if you are really tired of your free time then visit here pppppppppp our homepage and play free many online trending games and watch free online videos here on just single click it is really an awesome and fantastic fun and game zone here you can spend your free time and get rigid of your boring time.

Sign In or Register to comment.