# -*- coding: UTF-8 -*- import requests import sys import json import random import string import os from csv import reader from datetime import datetime from user_test import get_user, get_all_users, delete_user from group_test import get_group_by_name, get_group, create_group, add_user_to_group from exam_test import create_exam, create_question, get_question, add_question_to_exam from category_test import create_material, get_material, create_category, get_category, add_material_to_category, update_category, create_question_category, get_question_category, questions_in_category, add_question_to_question_category, update_question_category, show_root_categories from learningmaterial_test import add_comment USERNAME_RANDOM_LENGTH = 10 from constants import WEBSITE_PREFIX override = False want_material = False want_user = False want_question = False if len(sys.argv) > 1: for arg in sys.argv: if arg == 'override': override = True if arg == 'material': want_material = True if arg == 'user': want_user = True if arg == 'question': want_question = True def _random_string(length): letters = string.ascii_letters return ''.join(random.choice(letters) for i in range(length)) def _text(resp): return json.loads(resp.text) # 创建 10 个 前缀为namePrefix 密码为password的 用户 def add_dummy_users(namePrefix = 'testuser-', password = 'test', number = 10): groups = add_groups() users = [] nicknames = ['小强', '小张', '小李', '大王'] firstNames = ['一', '二', '三', '四', '五'] lastNames = ['张', '王', '李', '杜', '靳'] for i in range(number): user_name = namePrefix + _random_string(USERNAME_RANDOM_LENGTH) # register_user(user_name, password) new_user = { 'userName': user_name, 'password': password, 'nickName': nicknames[i % len(nicknames)], 'studyId': 'U0000' + str(i), 'email': 'test@gmail.com', 'firstName': firstNames[i % len(firstNames)], 'lastName': lastNames[i % len(lastNames)], 'phone': '13060901234', 'birthday': '1999-01-01', } register_user(new_user) user = get_user(user_name) random_group = groups[random.randint(0, len(groups)-1)] add_user_to_group(random_group, _text(user)) users.append(_text(user)['userName']) print('created ' + str(len(users)) + ' users: ' + ','.join(users)) global all_users all_users = users return users # 创建 3个二级的group # orgtest-level1 # orgtest-level2-a orgtest-level2-b orgtest-level2-c def add_groups(namePrefix = 'orgtest-', number=3): # creating level1 group level1_name = namePrefix + 'level1-' + _random_string(3) level1_group_body = { 'name': level1_name } create_group(level1_group_body) level1_group = _text(get_group_by_name(level1_name)) groups = [] for i in range(number): level2_group_body = { 'name': namePrefix + 'level2-' + _random_string(3), 'parentGroupId': level1_group['id'] } resp = create_group(level2_group_body) groupId = _text(resp)['id'] groups.append(groupId) print ('created ' + str(len(groups)) + ' groups: ' + ','.join(groups)) return groups def randomly_pick_user(): global all_users rand_index = random.randint(0, len(all_users) - 1) return all_users[rand_index] def create_exams(namePrefix='测试考试-', number=2, tags=[]): exams = [] for i in range(number): new_exam = { 'name': namePrefix + _random_string(3), 'description': '可以回家做', 'duration': random.randint(0, 863) * 100, # seconds 'tags': tags, 'startTime': '2021-05-01T10:00:00' } resp = create_exam(new_exam) exams.append(_text(resp)['id']) print ('created ' + str(len(exams)) + ' exams: ' + ','.join(exams)) return exams def create_questions(namePrefix='qtest-', number=10, from_file=False): exams = [] if not from_file: exams = create_exams() questions = [] question_categories = create_question_categories() if from_file: files = os.listdir('./demo-questions/') for file_name in files: with open('./demo-questions/' + file_name, encoding="UTF-8") as read_obj: print('loading ' + file_name + ' to db') exam_tag = file_name.split('-')[1] exam_prefix = file_name.split('-')[0] exams = create_exams(namePrefix=exam_prefix + '-' + exam_tag, number=6, tags=[exam_tag]) csv_reader = reader(read_obj) for row in csv_reader: qNumber = row[0] if qNumber == '题号' or len(qNumber) < 1: continue content = row[1] # required temp_type = row[2] # required qType = 'DanXuan' if temp_type == '2': qType = 'DuoXuan' if temp_type == '3': qType = 'PanDuan' answers = [] # required for i in [3, 4, 5, 6]: if len(row[i]) > 0: answers.append(row[i]) finalAnswer = row[7] # required answerAnalysis = row[8] keyword = row[9]; level = row[10]; tags = [] if len(row[11]) > 0: if row[11] == '线路工': tags.append('XianLuGong') elif row[11] == '桥隧工': tags.append('QiaoSuiGong') elif row[11] == '大型线路机械司机': tags.append('DaXingXianLuJiXieSiJi') elif row[11] == '测量工': tags.append('CeLiangGong') if len(content) == 0 or len(qType) == 0 or len(answers) == 0 or len(finalAnswer) == 0: print('Skip due to required info missing') continue question = { 'userId': randomly_pick_user(), 'content': content, 'answers': answers, 'finalAnswer': [finalAnswer], 'keyword': keyword, 'type': qType, 'questionLevel': level, 'tags': tags, 'answerAnalysis': answerAnalysis, } resp = create_question(question) random_exam_id = exams[random.randint(0, len(exams)-1)] exam_to_question = { 'questionId': _text(resp)['id'] } # if random number equals to magic number, then add tag 今日必练 if random.randint(0, 100) == 3: add_tag(datetime.today().strftime('%Y%m%d'), questionId=_text(resp)['id']) # add newly created question to a random exam add_question_to_exam(random_exam_id, [exam_to_question]) questions.append(_text(resp)['id']) else: for i in range(number): question = { 'userId': randomly_pick_user(), 'content': namePrefix + '天上什么星星最亮?-' + str(i), 'answers': ['太阳', '小天狼星', '北极星'], 'finalAnswer': ['小天狼星'], 'type': 'DanXuan' # 单选题 } resp = create_question(question) random_exam_id = exams[random.randint(0, len(exams)-1)] # add newly created question to a random exam add_question_to_exam(random_exam_id, [_text(resp)]) questions.append(_text(resp)['id']) tiankong_question = { 'userId': randomly_pick_user(), 'content': namePrefix + '天上$PH$最亮?-' + str(i), 'finalAnswer': ['小天狼星'], 'type': 'TianKong' # 填空题 } resp = create_question(tiankong_question) random_exam_id = exams[random.randint(0, len(exams)-1)] # add newly created question to a random exam add_question_to_exam(random_exam_id, [_text(resp)]) random_category_id = question_categories[random.randint(0, len(question_categories)-1)] add_question_to_question_category(random_category_id, _text(resp)) questions.append(_text(resp)['id']) print ('created ' + str(len(questions)) + ' questions') return questions def create_question_categories(namePrefix= '知识类-', number=3): resp = create_question_category(namePrefix + _random_string(3)) question_root_id = _text(resp)['id'] categories = [] names = ['科普类-', '技术类-', '历史类-', '天文类-', '兴趣类-'] for i in range(number): child = create_question_category(names[i % len(names)] + _random_string(3)) child_id = _text(child)['id'] update_question_category(child_id, { 'parentId': question_root_id }) categories.append(child_id) print ('created ' + str(len(categories)) + ' question categories: ' + ','.join(categories)) return categories def create_categories(namePrefix= 'testCategory-', number=3): resp = create_category(namePrefix + 'root-' + _random_string(3)) root_id = _text(resp)['id'] categories = [] for i in range(number): child = create_category(namePrefix + 'child-' + _random_string(3)) child_id = _text(child)['id'] update_category(child_id, { 'parentId': root_id }) categories.append(child_id) print ('created ' + str(len(categories)) + ' categories: ' + ','.join(categories)) return categories def create_2_levels_categories(root_level, second_levels): resp = create_category(root_level) parent_id = _text(resp)['id'] children = [] for category_name in second_levels: child = create_category(category_name) child_id = _text(child)['id'] update_category(child_id, { 'parentId': parent_id }) children.append(child_id) return children def create_specific_categories(): categories = [] resp = show_root_categories() if len(_text(resp)) == 0: categories.extend(create_2_levels_categories('思政', ['党规党纪', '思想理论', '党史故事'])) categories.extend(create_2_levels_categories('安全', ['劳动安全', '作业安全', '事故案例'])) categories.extend(create_2_levels_categories('理论', ['基础理论', '专业理论'])) categories.extend(create_2_levels_categories('实作', ['作业指导书', '作业技巧', '应急处置'])) categories.extend(create_2_levels_categories('教材', ['文字', '音像', '课件'])) categories.extend(create_2_levels_categories('法纪', ['国家法律', '法纪法规', '交通规则'])) categories.extend(create_2_levels_categories('健康', ['健康知识', '健康养生', '心理辅导'])) categories.extend(create_2_levels_categories('集萃', ['旅游', '摄影','诗词', '名篇'])) else: print('Skip creating specific categories') root_categories = _text(resp) for root_category in root_categories: categories.append(root_category['id']) return categories def read_material_from_file(file_name): file = open('./demo-materials/' + file_name, encoding="UTF-8") file_lines = file.readlines() count = 0 content = '' title = '' for line in file_lines: line = line.strip() if len(line) < 1: continue # whitespaces if count == 0: title = line else: content += line + '\n' count+=1 return title, content def create_learning_materials(namePrefix='测试资料-', number=20, from_file=False): categories = [] if not from_file: categories = create_categories() materials = [] if from_file: files = os.listdir('./demo-materials/') categories = create_specific_categories() for file_name in files: title, content = read_material_from_file(file_name) material = create_material({ 'name': title, 'userId': randomly_pick_user(), 'type': 'ARTICLE', 'contents': content, }) materialId = _text(material)['id'] # if random number equals to magic number, then add tag 今日必练 if random.randint(0, 10) == 3: add_tag(datetime.today().strftime('%Y%m%d'), materialId=materialId) materials.append(materialId) random_category = categories[random.randint(0, len(categories)-1)] add_material_to_category(random_category, _text(material)) create_comments(materialId) else: for i in range(number): user = randomly_pick_user() material = create_material({ 'name': namePrefix + _random_string(5), 'userId': randomly_pick_user(), 'description': '好看的资料', 'type': 'ARTICLE', 'contents': '十年高考三年模拟-' + _random_string(20), 'links': [_random_string(3), _random_string(3), _random_string(3)] }) materialId = _text(material)['id'] materials.append(materialId) random_category = categories[random.randint(0, len(categories)-1)] add_material_to_category(random_category, _text(material)) create_comments(materialId) print ('created ' + str(len(materials)) + ' materials: ' + ','.join(materials)) return materials def create_comments(materialId, namePrefix='[自动生成]', number=5): dummy_comments = ['我去,这也太酷了!', '我太爱学这个了,老牛了', '这太有意思了,收藏了', '好文,顶一个', '我踩'] for i in range(number): comment = { 'userId': randomly_pick_user(), 'content': dummy_comments[i % len(dummy_comments)] } add_comment(materialId, comment) print('created ' + str(number) + ' comments to material: ' + materialId) def register_user(userData): url = WEBSITE_PREFIX + '/user/register' return requests.post(url, json=userData) def add_tag(tag, materialId=None, questionId=None, examId=None): url = WEBSITE_PREFIX + '/tags/' + tag + '?' if materialId: url += 'material=' + materialId if questionId: url += 'question=' + questionId if examId: url += 'exam=' + examId return requests.post(url) def remove_tag(tag, materialId=None, questionId=None, examId=None): url = WEBSITE_PREFIX + '/tags/' + tag + '?' if materialId: url += 'material=' + materialId if questionId: url += 'question=' + questionId if examId: url += 'exam=' + examId return requests.delete(url) def get_tag(tag): url = WEBSITE_PREFIX + '/tags/' + tag return requests.get(url) def delete_all_exams(): url = WEBSITE_PREFIX + '/admin/exams' return requests.delete(url) def delete_all_questions(): url = WEBSITE_PREFIX + '/admin/questions' return requests.delete(url) def delete_all_categories(): url = WEBSITE_PREFIX + '/admin/categories' return requests.delete(url) def delete_all_tags(): url = WEBSITE_PREFIX + '/admin/tags' return requests.delete(url) def delete_all_materials(): url = WEBSITE_PREFIX + '/admin/materials' return requests.delete(url) def delete_all_comments(): url = WEBSITE_PREFIX + '/admin/comments' return requests.delete(url) if override: all_users = json.loads(get_all_users().text) for user in all_users: delete_user(user['userName']) print('deleted all users') delete_all_exams() print('deleted all exams') delete_all_questions() print('deleted all questions') delete_all_categories() print('deleted all categories') delete_all_tags() print('deleted all tags') delete_all_materials() print('deleted all materials') delete_all_comments() print('deleted all comments') if want_user: add_dummy_users() else: print('Skip creating users') if want_question: create_questions(from_file=True) else: print('Skip creating questions') if want_material: create_learning_materials(from_file=True) else: print('Skip creating materials')