- 자동매매 프로그램 GitHub URL - https://github.com/mujomboy/xingProject.git
- 로그인 클래스 (전체 소스)
더보기
import pythoncom
import win32com.client
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import QWidget, QLabel, QHBoxLayout, QLineEdit, QPushButton, QRadioButton
from admin import Conn
class Login(QWidget):
def __init__(self):
super().__init__()
print("INIT LOGIN")
self.layout_main = QHBoxLayout(self) # 메인레이아웃
lbl_id = QLabel("ID ") # 아이디 라벨 생성
lbl_pw = QLabel("PW ") # 패스워드 라벨 생성
lbl_port = QLabel("PORT ") # 포트 라벨 생성
lbl_cert = QLabel("CERTIFICATION ") # 인증서 라벨 생성
self.txt_id = QLineEdit("") # 아이디 에디트 생성
self.txt_pw = QLineEdit("") # 패스워드 에디트 생성
self.txt_port = QLineEdit("200001") # 포트 에디트 생성
self.txt_cert = QLineEdit() # 인증서 에디트 생성
self.rdo_real = QRadioButton("실제투자")
self.rdo_vir = QRadioButton("모의투자")
self.rdo_vir.setChecked(True)
btn_login = QPushButton("LOGIN") # 로그인 버튼
lbl_id.setAlignment(Qt.AlignCenter)
lbl_pw.setAlignment(Qt.AlignCenter)
lbl_port.setAlignment(Qt.AlignCenter)
lbl_cert.setAlignment(Qt.AlignCenter)
# 위젯들 레이아웃에 연결
self.layout_main.addWidget(lbl_id)
self.layout_main.addWidget(self.txt_id)
self.layout_main.addWidget(lbl_pw)
self.layout_main.addWidget(self.txt_pw)
self.layout_main.addWidget(lbl_port)
self.layout_main.addWidget(self.txt_port)
self.layout_main.addWidget(lbl_cert)
self.layout_main.addWidget(self.txt_cert)
self.layout_main.addWidget(self.rdo_real)
self.layout_main.addWidget(self.rdo_vir)
self.layout_main.addWidget(btn_login)
# 버튼 클릭 시 호출할 함수 연결
btn_login.clicked.connect(self.login_clicked)
# 세션 객체 받아올 변수
self.session = None
# 세션 생성
def create_XASession(self):
Conn().get_msg().add_msg(self, "Check XASession state")
# 세션 생성
if self.session is None:
Conn().get_msg().add_msg(self, "Create XASession")
# 세션 객체 요청 및 객체 등록
self.session = win32com.client.DispatchWithEvents("XA_Session.XASession", SessionEvents)
# 서버 연결 여부 확인
elif self.session.IsConnected():
Conn().get_msg().add_msg(self, "Server is Connected")
# 서버 끊기
self.disconnect_server()
# 서버 끊기
def disconnect_server(self):
Conn().get_msg().add_msg(self, 'Disconnected server')
self.session.DisconnectServer()
SessionEvents.state = ""
SessionEvents.msg = ""
# 서버 연결
def connect_server(self):
Conn().get_msg().add_msg(self, 'Request server connect')
# 접속 URL
url = 'demo.ebestsec.co.kr' # 모의 투자
if self.rdo_real.isChecked():
url = 'hts.ebestsec.co.kr' # 실제 투자
Conn().get_msg().add_msg(self, "URL : " + url)
Conn().get_msg().add_msg(self, "PORT : " + self.txt_port.text())
# 서버 연결 요청 (URL, 포트 번호 전달)
result = self.session.ConnectServer(url, int(self.txt_port.text()))
# 연결 여부 확인
if not result:
# 연결 실패 시
info = self.get_error_info()
Conn().get_msg().add_msg(self, "Connect Failed")
Conn().get_msg().add_msg(self, "Error Code : " + info[0])
Conn().get_msg().add_msg(self, "Error Msg : " + info[1])
if self.session.IsConnected():
self.disconnect_server()
return False
Conn().get_msg().add_msg(self, "Server Connect Success")
return True
# 로그인 진행
def login(self):
Conn().get_msg().add_msg(self, "Login Attempt")
# 이베스트 로그인
Conn().get_msg().add_msg(self, "ID : " + self.txt_id.text())
Conn().get_msg().add_msg(self, "PWD : " + self.txt_pw.text())
Conn().get_msg().add_msg(self, "CERT : " + self.txt_cert.text())
# 세션 로그인 요청
self.session.Login(self.txt_id.text(), self.txt_pw.text(), self.txt_cert.text(), 0, 0)
Conn().get_msg().add_msg(self, 'Wait....')
while SessionEvents.state == "":
# 로그인 상태 변경 메시지 채크
pythoncom.PumpWaitingMessages()
if SessionEvents.state == "0000":
Conn().get_msg().add_msg(self, 'Login Success')
else:
Conn().get_msg().add_msg(self, "Login Failed")
Conn().get_msg().add_msg(self, "Error Code : " + SessionEvents.state)
Conn().get_msg().add_msg(self, "Error Msg : " + SessionEvents.msg)
Conn().get_msg().add_msg(self, "========== END =============\n")
def login_clicked(self):
# 세션 생성
self.create_XASession()
# 서버 연결
if not self.connect_server():
return
# 로그인 진행
self.login()
for i in range(self.session.GetAccountListCount()):
Conn().get_msg().add_msg(self, self.session.GetAccountList(i))
def get_error_info(self):
code = Conn().get_session().GetLastError()
return [code, self.session.GetErrorMessage(code)]
# 세션 클래스
class SessionEvents:
state = ""
msg = ""
def OnLogin(self, code, msg):
SessionEvents.state = code
SessionEvents.msg = msg
def OnLogout(self):
print("OnLogout")
def OnDisconnect(self):
print("OnDisconnect")
- 로그인을 위해 필요한 UI 생성
lbl_id = QLabel("ID ") # 아이디 라벨 생성
lbl_pw = QLabel("PW ") # 패스워드 라벨 생성
lbl_port = QLabel("PORT ") # 포트 라벨 생성
lbl_cert = QLabel("CERTIFICATION ") # 인증서 라벨 생성
self.txt_id = QLineEdit("") # 아이디 에디트 생성
self.txt_pw = QLineEdit("") # 패스워드 에디트 생성
self.txt_port = QLineEdit("200001") # 포트 에디트 생성
self.txt_cert = QLineEdit() # 인증서 에디트 생성
self.rdo_real = QRadioButton("실제투자")
self.rdo_vir = QRadioButton("모의투자")
btn_login = QPushButton("LOGIN") # 로그인 버튼
- 로그인 버튼 클릭 시 동작 설명
def login_clicked(self):
# 1. 세션 생성 여부를 확인한다.
# 2-1. 미생성 시 세션을 생성 합니다.
# 2-2. 기존에 세션이 생성되어 있다면 서버 연결 여부를 확인합니다.
# 2-2-1. 서버가 연결되어 있다면 서버를 끊습니다.
self.create_XASession()
# 1. 접속할 url 주소 확인
# 2. 서버 연결 요청
# 3. 연결 실패 시 실패 메시지 출력 후 리턴
if not self.connect_server():
return
# 1. 세션에 로그인 정보 전달 및 요청
# 2. 로그인 요청에 대한 답이 올때까지 대기
# 3. 로그인 성공 여부 확인
self.login()
# 세션에서 계좌 정보 리스트 가져와서 출력
for i in range(self.session.GetAccountListCount()):
Conn().get_msg().add_msg(self, self.session.GetAccountList(i))
'xingAPI > 자동 매매 프로그램 만들기 (PYQT5)' 카테고리의 다른 글
[xingAPI, 파이썬, PYQT5 - QtChart] 주식 이동평균선 만들기 (5) | 2022.08.20 |
---|---|
[xingAPI, 파이썬, PYQT5 - QtChart] 주식 봉차트(캔들차트) 만들기 (0) | 2022.07.29 |
[xingAPI, 파이썬] 주식차트(N분) 데이터 조회하기 (0) | 2022.07.08 |