|
- # -*- coding: UTF-8 -*-
- import requests
- import sys
- import json
- import random
- import string
- import os
- from csv import reader
- from datetime import datetime
- from user_test import get_user, get_all_users, delete_user
- from group_test import get_group_by_name, get_group, create_group, add_user_to_group
- from exam_test import create_exam, create_question, get_question, add_question_to_exam
- from category_test import create_material, get_material, create_category, get_category, add_material_to_category, update_category, create_question_category, get_question_category, questions_in_category, add_question_to_question_category, update_question_category, show_root_categories
- from learningmaterial_test import add_comment
- USERNAME_RANDOM_LENGTH = 10
- from constants import WEBSITE_PREFIX
- override = False
- want_material = False
- want_user = False
- want_question = False
- if len(sys.argv) > 1:
- for arg in sys.argv:
- if arg == 'override':
- override = True
- if arg == 'material':
- want_material = True
- if arg == 'user':
- want_user = True
- if arg == 'question':
- want_question = True
- def _random_string(length):
- letters = string.ascii_letters
- return ''.join(random.choice(letters) for i in range(length))
- def _text(resp):
- return json.loads(resp.text)
- # 创建 10 个 前缀为namePrefix 密码为password的 用户
- def add_dummy_users(namePrefix = 'testuser-', password = 'test', number = 10):
- groups = add_groups()
- users = []
- nicknames = ['小强', '小张', '小李', '大王']
- firstNames = ['一', '二', '三', '四', '五']
- lastNames = ['张', '王', '李', '杜', '靳']
- for i in range(number):
- user_name = namePrefix + _random_string(USERNAME_RANDOM_LENGTH)
- # register_user(user_name, password)
- new_user = {
- 'userName': user_name,
- 'password': password,
- 'nickName': nicknames[i % len(nicknames)],
- 'studyId': 'U0000' + str(i),
- 'email': 'test@gmail.com',
- 'firstName': firstNames[i % len(firstNames)],
- 'lastName': lastNames[i % len(lastNames)],
- 'phone': '13060901234',
- 'birthday': '1999-01-01',
- }
- register_user(new_user)
- user = get_user(user_name)
- random_group = groups[random.randint(0, len(groups)-1)]
- add_user_to_group(random_group, _text(user))
- users.append(_text(user)['userName'])
- print('created ' + str(len(users)) + ' users: ' + ','.join(users))
- global all_users
- all_users = users
- return users
- # 创建 3个二级的group
- # orgtest-level1
- # orgtest-level2-a orgtest-level2-b orgtest-level2-c
- def add_groups(namePrefix = 'orgtest-', number=3):
- # creating level1 group
- level1_name = namePrefix + 'level1-' + _random_string(3)
- level1_group_body = {
- 'name': level1_name
- }
- create_group(level1_group_body)
- level1_group = _text(get_group_by_name(level1_name))
- groups = []
- for i in range(number):
- level2_group_body = {
- 'name': namePrefix + 'level2-' + _random_string(3),
- 'parentGroupId': level1_group['id']
- }
- resp = create_group(level2_group_body)
- groupId = _text(resp)['id']
- groups.append(groupId)
- print ('created ' + str(len(groups)) + ' groups: ' + ','.join(groups))
- return groups
-
- def randomly_pick_user():
- global all_users
- rand_index = random.randint(0, len(all_users) - 1)
- return all_users[rand_index]
- def create_exams(namePrefix='测试考试-', number=2, tags=[]):
- exams = []
- for i in range(number):
- new_exam = {
- 'name': namePrefix + _random_string(3),
- 'description': '可以回家做',
- 'duration': random.randint(0, 863) * 100, # seconds
- 'tags': tags,
- 'startTime': '2021-05-01T10:00:00'
- }
- resp = create_exam(new_exam)
- exams.append(_text(resp)['id'])
- print ('created ' + str(len(exams)) + ' exams: ' + ','.join(exams))
- return exams
- def create_questions(namePrefix='qtest-', number=10, from_file=False):
- exams = []
- if not from_file:
- exams = create_exams()
- questions = []
- question_categories = create_question_categories()
- if from_file:
- files = os.listdir('./demo-questions/')
- for file_name in files:
- with open('./demo-questions/' + file_name, encoding="UTF-8") as read_obj:
- print('loading ' + file_name + ' to db')
- exam_tag = file_name.split('-')[1]
- exam_prefix = file_name.split('-')[0]
- exams = create_exams(namePrefix=exam_prefix + '-' + exam_tag, number=6, tags=[exam_tag])
- csv_reader = reader(read_obj)
- for row in csv_reader:
- qNumber = row[0]
- if qNumber == '题号' or len(qNumber) < 1:
- continue
- content = row[1] # required
- temp_type = row[2] # required
- qType = 'DanXuan'
- if temp_type == '2':
- qType = 'DuoXuan'
- if temp_type == '3':
- qType = 'PanDuan'
- answers = [] # required
- for i in [3, 4, 5, 6]:
- if len(row[i]) > 0:
- answers.append(row[i])
- finalAnswer = row[7] # required
- answerAnalysis = row[8]
- keyword = row[9];
- level = row[10];
- tags = []
- if len(row[11]) > 0:
- if row[11] == '线路工':
- tags.append('XianLuGong')
- elif row[11] == '桥隧工':
- tags.append('QiaoSuiGong')
- elif row[11] == '大型线路机械司机':
- tags.append('DaXingXianLuJiXieSiJi')
- elif row[11] == '测量工':
- tags.append('CeLiangGong')
- if len(content) == 0 or len(qType) == 0 or len(answers) == 0 or len(finalAnswer) == 0:
- print('Skip due to required info missing')
- continue
- question = {
- 'userId': randomly_pick_user(),
- 'content': content,
- 'answers': answers,
- 'finalAnswer': [finalAnswer],
- 'keyword': keyword,
- 'type': qType,
- 'questionLevel': level,
- 'tags': tags,
- 'answerAnalysis': answerAnalysis,
-
- }
- resp = create_question(question)
- random_exam_id = exams[random.randint(0, len(exams)-1)]
- exam_to_question = {
- 'questionId': _text(resp)['id']
- }
- # if random number equals to magic number, then add tag 今日必练
- if random.randint(0, 100) == 3:
- add_tag(datetime.today().strftime('%Y%m%d'), questionId=_text(resp)['id'])
- # add newly created question to a random exam
- add_question_to_exam(random_exam_id, [exam_to_question])
- questions.append(_text(resp)['id'])
- else:
- for i in range(number):
- question = {
- 'userId': randomly_pick_user(),
- 'content': namePrefix + '天上什么星星最亮?-' + str(i),
- 'answers': ['太阳', '小天狼星', '北极星'],
- 'finalAnswer': ['小天狼星'],
- 'type': 'DanXuan' # 单选题
- }
- resp = create_question(question)
- random_exam_id = exams[random.randint(0, len(exams)-1)]
- # add newly created question to a random exam
- add_question_to_exam(random_exam_id, [_text(resp)])
- questions.append(_text(resp)['id'])
- tiankong_question = {
- 'userId': randomly_pick_user(),
- 'content': namePrefix + '天上$PH$最亮?-' + str(i),
- 'finalAnswer': ['小天狼星'],
- 'type': 'TianKong' # 填空题
- }
- resp = create_question(tiankong_question)
- random_exam_id = exams[random.randint(0, len(exams)-1)]
- # add newly created question to a random exam
- add_question_to_exam(random_exam_id, [_text(resp)])
- random_category_id = question_categories[random.randint(0, len(question_categories)-1)]
- add_question_to_question_category(random_category_id, _text(resp))
- questions.append(_text(resp)['id'])
- print ('created ' + str(len(questions)) + ' questions')
- return questions
- def create_question_categories(namePrefix= '知识类-', number=3):
- resp = create_question_category(namePrefix + _random_string(3))
- question_root_id = _text(resp)['id']
- categories = []
- names = ['科普类-', '技术类-', '历史类-', '天文类-', '兴趣类-']
- for i in range(number):
- child = create_question_category(names[i % len(names)] + _random_string(3))
- child_id = _text(child)['id']
- update_question_category(child_id, {
- 'parentId': question_root_id
- })
- categories.append(child_id)
- print ('created ' + str(len(categories)) + ' question categories: ' + ','.join(categories))
- return categories
- def create_categories(namePrefix= 'testCategory-', number=3):
- resp = create_category(namePrefix + 'root-' + _random_string(3))
- root_id = _text(resp)['id']
- categories = []
- for i in range(number):
- child = create_category(namePrefix + 'child-' + _random_string(3))
- child_id = _text(child)['id']
- update_category(child_id, {
- 'parentId': root_id
- })
- categories.append(child_id)
- print ('created ' + str(len(categories)) + ' categories: ' + ','.join(categories))
- return categories
- def create_2_levels_categories(root_level, second_levels):
- resp = create_category(root_level)
- parent_id = _text(resp)['id']
- children = []
- for category_name in second_levels:
- child = create_category(category_name)
- child_id = _text(child)['id']
- update_category(child_id, {
- 'parentId': parent_id
- })
- children.append(child_id)
- return children
- def create_specific_categories():
- categories = []
- resp = show_root_categories()
- if len(_text(resp)) == 0:
- categories.extend(create_2_levels_categories('思政', ['党规党纪', '思想理论', '党史故事']))
- categories.extend(create_2_levels_categories('安全', ['劳动安全', '作业安全', '事故案例']))
- categories.extend(create_2_levels_categories('理论', ['基础理论', '专业理论']))
- categories.extend(create_2_levels_categories('实作', ['作业指导书', '作业技巧', '应急处置']))
- categories.extend(create_2_levels_categories('教材', ['文字', '音像', '课件']))
- categories.extend(create_2_levels_categories('法纪', ['国家法律', '法纪法规', '交通规则']))
- categories.extend(create_2_levels_categories('健康', ['健康知识', '健康养生', '心理辅导']))
- categories.extend(create_2_levels_categories('集萃', ['旅游', '摄影','诗词', '名篇']))
- else:
- print('Skip creating specific categories')
- root_categories = _text(resp)
- for root_category in root_categories:
- categories.append(root_category['id'])
- return categories
- def read_material_from_file(file_name):
- file = open('./demo-materials/' + file_name, encoding="UTF-8")
- file_lines = file.readlines()
- count = 0
- content = ''
- title = ''
- for line in file_lines:
- line = line.strip()
- if len(line) < 1:
- continue # whitespaces
- if count == 0:
- title = line
- else:
- content += line + '\n'
- count+=1
- return title, content
- def create_learning_materials(namePrefix='测试资料-', number=20, from_file=False):
- categories = []
- if not from_file:
- categories = create_categories()
- materials = []
- if from_file:
- files = os.listdir('./demo-materials/')
- categories = create_specific_categories()
- for file_name in files:
- title, content = read_material_from_file(file_name)
- material = create_material({
- 'name': title,
- 'userId': randomly_pick_user(),
- 'type': 'ARTICLE',
- 'contents': content,
- })
- materialId = _text(material)['id']
- # if random number equals to magic number, then add tag 今日必练
- if random.randint(0, 10) == 3:
- add_tag(datetime.today().strftime('%Y%m%d'), materialId=materialId)
- materials.append(materialId)
- random_category = categories[random.randint(0, len(categories)-1)]
- add_material_to_category(random_category, _text(material))
- create_comments(materialId)
- else:
- for i in range(number):
- user = randomly_pick_user()
- material = create_material({
- 'name': namePrefix + _random_string(5),
- 'userId': randomly_pick_user(),
- 'description': '好看的资料',
- 'type': 'ARTICLE',
- 'contents': '十年高考三年模拟-' + _random_string(20),
- 'links': [_random_string(3), _random_string(3), _random_string(3)]
- })
- materialId = _text(material)['id']
- materials.append(materialId)
- random_category = categories[random.randint(0, len(categories)-1)]
- add_material_to_category(random_category, _text(material))
- create_comments(materialId)
- print ('created ' + str(len(materials)) + ' materials: ' + ','.join(materials))
- return materials
- def create_comments(materialId, namePrefix='[自动生成]', number=5):
- dummy_comments = ['我去,这也太酷了!', '我太爱学这个了,老牛了', '这太有意思了,收藏了', '好文,顶一个', '我踩']
- for i in range(number):
- comment = {
- 'userId': randomly_pick_user(),
- 'content': dummy_comments[i % len(dummy_comments)]
- }
- add_comment(materialId, comment)
- print('created ' + str(number) + ' comments to material: ' + materialId)
- def register_user(userData):
- url = WEBSITE_PREFIX + '/user/register'
- return requests.post(url, json=userData)
- def add_tag(tag, materialId=None, questionId=None, examId=None):
- url = WEBSITE_PREFIX + '/tags/' + tag + '?'
- if materialId:
- url += 'material=' + materialId
- if questionId:
- url += 'question=' + questionId
- if examId:
- url += 'exam=' + examId
- return requests.post(url)
- def remove_tag(tag, materialId=None, questionId=None, examId=None):
- url = WEBSITE_PREFIX + '/tags/' + tag + '?'
- if materialId:
- url += 'material=' + materialId
- if questionId:
- url += 'question=' + questionId
- if examId:
- url += 'exam=' + examId
- return requests.delete(url)
- def get_tag(tag):
- url = WEBSITE_PREFIX + '/tags/' + tag
- return requests.get(url)
- def delete_all_exams():
- url = WEBSITE_PREFIX + '/admin/exams'
- return requests.delete(url)
- def delete_all_questions():
- url = WEBSITE_PREFIX + '/admin/questions'
- return requests.delete(url)
- def delete_all_categories():
- url = WEBSITE_PREFIX + '/admin/categories'
- return requests.delete(url)
- def delete_all_tags():
- url = WEBSITE_PREFIX + '/admin/tags'
- return requests.delete(url)
- def delete_all_materials():
- url = WEBSITE_PREFIX + '/admin/materials'
- return requests.delete(url)
- def delete_all_comments():
- url = WEBSITE_PREFIX + '/admin/comments'
- return requests.delete(url)
- if override:
- all_users = json.loads(get_all_users().text)
- for user in all_users:
- delete_user(user['userName'])
- print('deleted all users')
- delete_all_exams()
- print('deleted all exams')
- delete_all_questions()
- print('deleted all questions')
- delete_all_categories()
- print('deleted all categories')
- delete_all_tags()
- print('deleted all tags')
- delete_all_materials()
- print('deleted all materials')
- delete_all_comments()
- print('deleted all comments')
- if want_user:
- add_dummy_users()
- else:
- print('Skip creating users')
- if want_question:
- create_questions(from_file=True)
- else:
- print('Skip creating questions')
- if want_material:
- create_learning_materials(from_file=True)
- else:
- print('Skip creating materials')
|