請(qǐng)求示例
腳本代碼的樣例,這里提供了python的代碼示例。
1、打開單個(gè)環(huán)境
import requests
import sys
env_id = [2628] # 環(huán)境id
data = {"env_id": env_id}
res = requests.post("http://127.0.0.1:10086/api/open_env", json=data)
if res.status_code != 200:
print('環(huán)境打開失敗:{}'.format(res))
sys.exit()
open_res_json = res.json()
print('返回響應(yīng)數(shù)據(jù):{}'.format(open_res_json))
2、連續(xù)打開多個(gè)環(huán)境
import requests
import sys
import multiprocessing as mp
from time import sleep
def open_env(env_id):
data = {"env_id": env_id}
res = requests.post("http://127.0.0.1:10086/api/open_env", json=data)
if res.status_code != 200:
print('環(huán)境打開失敗:{}'.format(res))
sys.exit()
open_res_json = res.json()
print('返回響應(yīng)數(shù)據(jù):{}'.format(open_res_json))
if __name__ == '__main__':
env_ids = [2628, 2958] #多個(gè)環(huán)境id
procs = []
for env_id in env_ids:
env_proc = mp.Process(target=open_env, args=(env_id,))
procs.append(env_proc)
env_proc.start()
sleep(1)
for proc in procs:
proc.join()
3、打開單個(gè)環(huán)境并且操作瀏覽器打開新的網(wǎng)頁(yè)
import time
import sys
import requests
from selenium import webdriver
# 獲取webdriver
def get_driver_by_path(webdriver_path, port):
options = webdriver.ChromeOptions()
options.add_experimental_option("debuggerAddress", '127.0.0.1:' + str(port))
return webdriver.Chrome(webdriver_path, options=options)
def open_baidu(driver):
driver.get("https://www.baidu.com/") #打開新的網(wǎng)頁(yè)
time.sleep(1)
if __name__ == '__main__':
url = r'http://127.0.0.1:10086/api/open_env' # 打開環(huán)境
open_data = {"env_id": 2587} # 填寫參數(shù),環(huán)境id可從創(chuàng)建環(huán)境或環(huán)境列表接口獲取
open_res = requests.post(url, json=open_data).json()
if open_res['code'] != 200:
print('環(huán)境打開失敗:%s' % open_res)
sys.exit()
webdriver_path = open_res['data']['webdriver'] # 獲取webdriver路徑
debugging_port = open_res['data']['debuggingport'] # 獲取調(diào)試端口
# 獲取webdriver
driver = get_driver_by_path(webdriver_path, debugging_port) # 填寫打開環(huán)境返回的webdriver和debuggingPort參數(shù)
# 運(yùn)行腳本
open_baidu(driver)
4、打開賬號(hào)下所有環(huán)境,并且讓所有環(huán)境打開一個(gè)網(wǎng)頁(yè)
import os
from multiprocessing import Process, current_process
import requests
import sys
from selenium import webdriver
from time import sleep
HOST = 'http://127.0.0.1:10086'
OPEN_ENV = HOST + '/api/open_env'
CLOSR_ENV = HOST + '/api/close_env'
ENV_LIST = HOST + '/api/env_list'
ENV_STATUS = HOST + '/api/env_status'
class MultiEnv(Process):
def __init__(self, env_id):
# 必須調(diào)用父類的init方法
super(MultiEnv, self).__init__()
self.env_id = env_id
def run(self):
data = {"env_id": self.env_id}
try:
# self.pid
proc_id = os.getpid()
# 獲取子進(jìn)程名稱
# self.name
proc_name = current_process().name
print('proc_id:{0} proc_name:{1}'.format(proc_id, proc_name))
# 請(qǐng)求/api/env_list 得到所有環(huán)境信息
open_res_json = MultiEnv.get_res(OPEN_ENV, dat a_json=data)
# 獲取webdriver路徑
webdriver_path = open_res_json['data']['webdriver']
# 獲取調(diào)試端口
debugging_port = open_res_json['data']['debuggingport']
options = webdriver.ChromeOptions()
options.add_experimental_option("debuggerAddress", '127.0.0.1:' + str(debugging_port))
# 得到driver對(duì)象
driver_yangtao_env = webdriver.Chrome(webdriver_path, options=options)
# 這里就可以寫業(yè)務(wù)代碼
sleep(1)
driver_yangtao_env.get("https://www.baidu.com")
sleep(500)
finally:
requests.post(CLOSR_ENV, json=data)
# 請(qǐng)求接口的工具方法
@classmethod
def get_res(cls, url, data_json):
res = requests.post(url, json=data_json)
if res.status_code != 200:
print('環(huán)境打開失敗:{}'.format(res))
sys.exit()
open_res_json = res.json()
print('返回響應(yīng)數(shù)據(jù):{}'.format(open_res_json))
return open_res_json
# 請(qǐng)求/api/env_list 得到所有環(huán)境信息
@classmethod
def get_env_ids(cls):
env_ids = []
data = {
}
env_res = MultiEnv.get_res(ENV_LIST, data_json=data)
for env_data in env_res['data']:
env_ids.append(env_data['id'])
return env_ids
if __name__ == '__main__':
"""
1、調(diào)用接口/api/env_list獲得所有可用的環(huán)境id
2、使用進(jìn)程分別打開環(huán)境
"""
procs = []
# 得到賬號(hào)內(nèi)所有可用環(huán)境id
for env_id in MultiEnv.get_env_ids():
# 根據(jù)每個(gè)環(huán)境id,分別創(chuàng)建子進(jìn)程
proc = MultiEnv(env_id)
sleep(5)
procs.append(proc)
# 啟動(dòng)子進(jìn)程,啟動(dòng)一個(gè)新進(jìn)程實(shí)際就是執(zhí)行本進(jìn)程對(duì)應(yīng)的run方法
proc.start()
# join方法會(huì)讓父進(jìn)程等待子進(jìn)程結(jié)束后再執(zhí)行
for proc in procs:
proc.join()
print("Done.")