Configuring SHELL Type Task Node


Batch Data Processing supports multiple frameworks such as Hive, Spark, MapReduct, etc. When creating a workflow, you can add SHELL task nodes to develop data.


This section shows how to configure SHELL task nodes.

Executing HiveSQL Tasks

You can implement batch computing by using SHELL task nodes to execute HiveSQL tasks through the command line.

Command Format

canaanhive [arguments]

Parameter Description

Parameter Example Description
-f <arg> -f demo.sql The HQL file name.
-d <arg> -d 2018-01-01 The time parameter. In this example, the ${env.FORMAT} in the HQL file will automatically be replaced according to format in the Time Parameter Format table below.
-E <paraN>=<valN> -E para=abc The user-defined parameter assigned in the HQL file. In this example, the ${env.para} in the HQL file will automatically be replaced with abc.
-str <sql> -str “show tables;” The SQL statement.


Note

The letter P in the FORMAT parameter in the following table stands for Previous, so PnD/PnM/PnY stands for the previous n day(s)/month(s)/year(s).

The letter N stands for Next, so NnD/NnM/NnY stands for the next n day(s)/month(s)/year(s).

Time Parameter Format
FORMAT Range Value
YYYYMMDD   2018-01-01
YYYYMMDD_PnD 1<= n <=30 2017-12-31 ~ 2017-12-02
YYYYMMDD_PnM 1<= n <=12 2017-12-01 ~ 2017-01-01
YYYYMMDD_PnY 1<= n <=2 2017-01-01 ~ 2016-01-01
YYYYMMDD_NnD 1<= n <=2 2018-01-02 ~ 2018-01-03
YYYYMMDD_NnM 1<= n <=2 2018-02-01 ~ 2018-03-01
YYYYMMDD_NnY 1<= n <=2 2019-01-01 ~ 2020-01-01
YYYYMM   2018-01
YYYYMMDD_PnD 1<= n <=2 2017-12 ~ 2017-12
YYYYMMDD_PnM 1<= n <=2 2017-12 ~ 2017-11
YYYYMMDD_PnY 1<= n <=2 2017-01 ~ 2016-01
YYYYMMDD_NnD 1<= n <=2 2018-01 ~ 2018-01
YYYYMMDD_NnM 1<= n <=2 2018-02 ~ 2018-03
YYYYMMDD_NnY 1<= n <=2 2019-01 ~ 2020-01
YYYY   2018
YYYY_PnD 1<= n <=2 2017 ~ 2017
YYYY_PnM 1<= n <=2 2017 ~ 2017
YYYY_PnY 1<= n <=2 2017 ~ 2016
YYYY_NnD 1<= n <=2 2018 ~ 2018
YYYY_NnM 1<= n <=2 2018 ~ 2018
YYYY_NnY 1<= n <=2 2019 ~ 2020
MM   01
DD   01

Examples

With the command line in the SHELL node as per the following:

canaanhive -f demo.sql -d 2018-01-01 -E DB=demo


And an HQL file demo.sql having the following sample code:

use ${env.DB};
create table if not exists demo(id string);
insert into demo values('${env.YYYYMMDD}');


The executed content will be:

use demo;
create table if not exists demo(id string);
insert into demo values('2018-01-01');

Executing Spark Tasks

You can execute PySpark and Spark tasks through the command line by using SHELL task nodes.

Command Format

Using a PySpark Job as an example, create a SHELL type node and use the SHELL command to run the main function of the Job.

sh predict.sh

Submitting a PySpark Job

submit-pyspark-application    [options]      <python file>     [app arguments]

Parameter Description

Parameter Description
–python 2.7/3.5 The Python version, supports 2.7 or 3.5. The default is 2.7.
–pythonEnvPath The VirtualEnv path In HDFS. If not set, the default Python envrionment will be used.
–name NAME The name of your application.
–queue QUEUE_NAME The YARN queue to submit to (Default: “default”).
–num-executors NUM The number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least _NUM_.
–executor-cores NUM The number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode.)
–driver-cores NUM The number of cores used by the driver, only in cluster mode (Default: 1).
–conf PROP=VALUE The arbitrary Spark configuration property.
–py-files PY_FILES The comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
–files FILES The comma-separated list of files to be placed in the working directory of each executor.
–archives ARCHIVES The comma-separated list of archives to be extracted into the working directory of each executor.
–driver-memory MEM The memory for the driver (e.g. 1000M, 2G) (Default: 2G).
–driver-java-options The extra Java options to pass to the driver.
–driver-library-path The extra library path entries to pass to the driver.
–driver-class-path The extra class path entries to pass to the driver. Note that jars added with –jars are automatically included in the classpath.

Submitting a Spark Job

submit-spark-application    [options]      <app-jar>     [app arguments]

Parameter Description

Parameter Description
–class CLASS_NAME Your application’s main class (for Java / Scala apps).
–name NAME The name of your application.
–packages The comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The local maven repo will be searched, and the Maven central and any additional remote repositories will be given by the –repositories option. The format for the coordinates should be groupId:artifactId:version.
–jars JARS The comma-separated list of local jars to include on the driver and executor classpaths.
–conf PROP=VALUE The arbitrary Spark configuration property.
–files FILES The comma-separated list of files to be placed in the working directory of each executor.
–archives ARCHIVES The comma-separated list of archives to be extracted into the working directory of each executor.
–driver-memory MEM The memory for the driver (e.g. 1000M, 2G) (Default: 2G).
–driver-java-options The extra Java options to pass to the driver.
–driver-library-path The extra library path entries to pass to the driver.
–driver-class-path The extra class path entries to pass to the driver. Note that jars added with –jars are automatically included in the classpath.
–executor-cores NUM The number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode.)
–driver-cores NUM The number of cores used by the driver, only in cluster mode (Default: 1).
–queue QUEUE_NAME The YARN queue to submit to (Default: “default”).
–num-executors NUM The number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least _NUM_.

Examples

Using the example command to run the main function predict.sh in the SHELL node:

sh predict.sh


The example code for the main function is:

submit_pyspark_application_func(){
    submit-pyspark-application \
    --deploy-mode cluster \
    --queue ${1} \
    --name pyspark_predict_test \
    --num-executors 10 \
    --driver-memory 16g \
    --executor-memory 12g \
    --driver-cores 2 \
    --executor-cores 3 \
    --conf spark.eventLog.enabled=true \
    --conf spark.network.timeout=240000 \
    --conf spark.executor.heartbeatInterval=24000 \
    --conf spark.yarn.executor.memoryOverhead=8192 \
    --archives hdfs://user/db_test/userPythonLib.zip#ANACONDA  \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/MINICONDA/bin/python \
    --conf spark.yarn.maxAppAttempts=1 \
    --conf spark.logger_table=wens_status_algo_running \
    --conf spark.hdfs_user=${2} \
    --conf spark.hdfs_path=hdfs://titan/user/${2} \
    --conf spark.start_date=${3} \
    --conf spark.end_date=${4} \
    --conf spark.site_ids=${5} \
    --conf spark.metric_save_path=/user/${2}/operaphm_temperature/metrics \
    --py-files anomaly.py,hadoop_common_functions.py,layout.py,utm.zip,rle.py,common_tools.py,steadystatefilter.py,math_utils.py \
    --conf spark.eventLog.enabled=true  predict.py
}

echo "test"


In the above, predict.py is the incoming py file, and needs to be in the same directory as predict.sh.