Source code for python_lib_examples.async_files
import asyncio
from concurrent.futures import ThreadPoolExecutor
# Define an executor for running blocking I/O in a thread
executor = ThreadPoolExecutor()
# Asynchronous function to read a file
[docs]
async def async_read_file(file_path):
loop = asyncio.get_running_loop()
# Use run_in_executor to run the blocking I/O operation in a separate thread
with open(file_path, 'r') as file:
return await loop.run_in_executor(executor, file.read)
# Asynchronous function to write to a file
[docs]
async def async_write_file(file_path, content):
loop = asyncio.get_running_loop()
# Use run_in_executor to run the blocking I/O operation in a separate thread
with open(file_path, 'w') as file:
return await loop.run_in_executor(executor, file.write, content)
# Testing the async file I/O functions
[docs]
async def main():
# Writing to a file
await async_write_file('example.txt', 'Hello, Async World!')
print("File written successfully!")
# Reading from a file
content = await async_read_file('example.txt')
print(f"File content: {content}")
if __name__ == '__main__':
# Run the main function
asyncio.run(main())