并发调用异步函数
返回一个用一个
在某些情况下,我们需要同时调用多个函数,并且,每有一个函数完成任务,就进行对应 的处理。
以此减少用户的等待时间
定义多个异步函数
import httpx
def random_str(random_length=7, chars='AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'):
string = ''
length = len(chars) - 1
# random = Random()
# 设置循环每次取一个字符用来生成随机数
for i in range(random_length):
string += (chars[random.randint(0, length)])
return string
async def get_results(url,proxy=None):
if proxy is not None:
proxies = {"http://": proxy, "https://": proxy}
else:
proxies = None
try:
async with httpx.AsyncClient(proxies=proxies,timeout=100) as client:
response = await client.get(url)
except:
return []
if response:
paths=[]
d=response.json()["data"]
for i in d:
try:
url=i["url"]
async with httpx.AsyncClient(proxies=proxies,timeout=100) as client:
response = await client.get(url)
p=f"data/pictures/cache/{random_str(10)}.png"
with open(p,"wb") as f:
f.write(response.content)
paths.append(p)
except:
continue
return paths
else:
return []
async def ideo_gram(prompt,proxy=None):
url=f"https://apiserver.alcex.cn/ideogram/generate-image?prompt={prompt}"
return await get_results(url,proxy)
async def bing_dalle3(prompt,proxy=None):
url=f"https://apiserver.alcex.cn/dall-e-3/generate-image?prompt={prompt}"
return await get_results(url,proxy)
async def flux_speed(prompt,proxy=None):
url=f"https://apiserver.alcex.cn/flux-speed/generate-image?prompt={prompt}"
return await get_results(url,proxy)
async def recraft_v3(prompt,proxy=None):
url=f"https://apiserver.alcex.cn/recraft-v3/generate-image?prompt={prompt}"
return await get_results(url,proxy)
async def flux_ultra(prompt,proxy=None):
url=f"https://apiserver.alcex.cn/flux-pro.ultra/generate-image?prompt={prompt}"
return await get_results(url,proxy)
如上所示,我们定义了五个ai绘画的接口调用函数。
ideo_gram(prompt, proxy),
bing_dalle3(prompt, proxy),
#flux_speed(prompt, proxy),
#recraft_v3(prompt, proxy),
flux_ultra(prompt, proxy),
实现需求
在实际应用中,我们希望bot能同时调用这五个函数,并且每有一个完成,就把结果返回给用户。那么就可以这么写
functions = [
ideo_gram(prompt, proxy),
bing_dalle3(prompt, proxy),
flux_speed(prompt, proxy),
recraft_v3(prompt, proxy),
flux_ultra(prompt, proxy),
]
for future in asyncio.as_completed(functions):
try:
result = await future
print(f"Task succeed: {e}")
except Exception as e:
print(f"Task failed: {e}")
每有一个函数调用完成,就会立即打印,而不是等待所有函数调用完成才打印结果。
融入bot代码
#该导入什么自己看着导。
async def call_text2img(bot,event,config,prompt):
bot.logger.info(f"Received text2img prompt: {prompt}")
await bot.send(event, "正在绘制,请稍候...")
proxy=config.api["proxy"]["http_proxy"]
functions = [
#ideo_gram(prompt, proxy), #选择性不用
bing_dalle3(prompt, proxy),
#flux_speed(prompt, proxy), #选择性不用
#recraft_v3(prompt, proxy), #选择性不用
flux_ultra(prompt, proxy),
]
for future in asyncio.as_completed(functions):
try:
result = await future
if result is not []:
sendMes = []
for r in result:
sendMes.append(Node(content=[Image(file=r)]))
await bot.send(event, sendMes)
except Exception as e:
bot.logger.error(f"Task failed: {e}")
def main(bot,config):
@bot.on(GroupMessageEvent)
async def bing_dalle3_draw(event):
if str(event.raw_message).startswith("画 "):
prompt = str(event.raw_message).split("画 ")[1]
await call_text2img(bot, event, config, prompt)