取出1分鐘線的程式碼
- from ibapi.client import EClient
- from ibapi.wrapper import EWrapper
- from ibapi.contract import Contract
- import threading
- import time
- class IBapi(EWrapper, EClient):
- def __init__(self):
- EClient.__init__(self, self)
- self.data = [] #Initialize variable to store candle
- def historicalData(self, reqId, bar):
- print(f'Time: {bar.date} Close: {bar.close}')
- self.data.append([bar.date, bar.open,bar.high,bar.low,bar.close])
-
- def run_loop():
- app.run()
- app = IBapi()
- app.connect('127.0.0.1', 7497, 123)
- #Start the socket in a thread
- api_thread = threading.Thread(target=run_loop, daemon=True)
- api_thread.start()
- time.sleep(1) #Sleep interval to allow time for connection to server
- #Create contract object
- contract = Contract()
- contract.symbol = "MYM"
- contract.secType = "FUT"
- contract.exchange = "CBOT"
- contract.currency = "USD"
- contract.lastTradeDateOrContractMonth = "202303"
- #Request historical candles
- app.reqHistoricalData(1, contract, '', '5 D', '1 min', 'TRADES', 1, 2, False, [])
- time.sleep(10) #sleep to allow enough time for data to be returned
- #Working with Pandas DataFrames
- import pandas
- df = pandas.DataFrame(app.data, columns=['DateTime', 'Open','High','Low','Close'])
- df['DateTime'] = pandas.to_datetime(df['DateTime'],unit='s')
- df.to_csv('MYM-1min.csv')
- print(df)
- app.disconnect()
複製代碼
結果如下
MYM-1min.zip
(26.79 KB, 下載次數: 88)
|